source: trunk/src/allmydata/test/web/test_web.py

Last change on this file was c838967a, checked in by Jean-Paul Calderone <exarkun@…>, at 2023-07-11T20:15:56Z

Improve the name and type annotation of the tempfile factory

  • Property mode set to 100644
File size: 219.2 KB
Line 
1"""
2Tests for a bunch of web-related APIs.
3"""
4from __future__ import annotations
5
6from six import ensure_binary
7
8import os.path, re, time
9import treq
10from urllib.parse import quote as urlquote, unquote as urlunquote
11from base64 import urlsafe_b64encode
12
13from bs4 import BeautifulSoup
14
15from twisted.python.filepath import (
16    FilePath,
17)
18from twisted.application import service
19from twisted.internet import defer
20from twisted.internet.defer import inlineCallbacks, returnValue
21from twisted.internet.task import Clock
22from twisted.web import client, error, http
23from twisted.python import failure, log
24
25from allmydata import interfaces, uri, webish
26from allmydata.storage_client import StorageFarmBroker, StubServer
27from allmydata.immutable import upload
28from allmydata.immutable.downloader.status import DownloadStatus
29from allmydata.dirnode import DirectoryNode
30from allmydata.nodemaker import NodeMaker
31from allmydata.web.common import MultiFormatResource
32from allmydata.util import fileutil, base32, hashutil, jsonbytes as json
33from allmydata.util.consumer import download_to_data
34from allmydata.util.encodingutil import to_bytes
35from ...util.connection_status import ConnectionStatus
36from ...crypto.rsa import PublicKey, PrivateKey, create_signing_keypair, der_string_from_signing_key
37from ..common import (
38    EMPTY_CLIENT_CONFIG,
39    FakeCHKFileNode,
40    FakeMutableFileNode,
41    create_chk_filenode,
42    WebErrorMixin,
43    make_mutable_file_uri,
44    create_mutable_filenode,
45    TrialTestCase,
46)
47from .common import (
48    assert_soup_has_favicon,
49    assert_soup_has_text,
50    assert_soup_has_tag_with_attributes,
51    assert_soup_has_tag_with_content,
52    assert_soup_has_tag_with_attributes_and_content,
53    unknown_rwcap,
54    unknown_rocap,
55    unknown_immcap,
56)
57
58from allmydata.interfaces import (
59    IMutableFileNode, SDMF_VERSION, MDMF_VERSION,
60    FileTooLargeError,
61    MustBeReadonlyError,
62)
63from allmydata.mutable import servermap, publish, retrieve
64from allmydata.mutable.common import derive_mutable_keys
65from .. import common_util as testutil
66from ..common_util import TimezoneMixin
67from ..common_web import (
68    do_http,
69    Error,
70    render,
71)
72from ...web.common import (
73    humanize_exception,
74)
75
76from allmydata.client import _Client, SecretHolder
77
78# create a fake uploader/downloader, and a couple of fake dirnodes, then
79# create a webserver that works against them
80
81class FakeStatsProvider(object):
82    def get_stats(self):
83        stats = {'stats': {}, 'counters': {}}
84        return stats
85
86class FakeNodeMaker(NodeMaker):
87    encoding_params = {
88        'k': 3,
89        'n': 10,
90        'happy': 7,
91        'max_segment_size':128*1024 # 1024=KiB
92    }
93    all_contents: dict[bytes, object]
94    def _create_lit(self, cap):
95        return FakeCHKFileNode(cap, self.all_contents)
96    def _create_immutable(self, cap):
97        return FakeCHKFileNode(cap, self.all_contents)
98    def _create_mutable(self, cap):
99        return FakeMutableFileNode(None, None,
100                                   self.encoding_params, None,
101                                   self.all_contents, None).init_from_cap(cap)
102    def create_mutable_file(self,
103                            contents=None,
104                            version=None,
105                            keypair: tuple[PublicKey, PrivateKey] | None=None,
106                            ):
107        if contents is None:
108            contents = b""
109        if version is None:
110            version = SDMF_VERSION
111
112        n = FakeMutableFileNode(None, None, self.encoding_params, None,
113                                self.all_contents, keypair)
114        return n.create(contents, version=version)
115
116class FakeUploader(service.Service):
117    name = "uploader" # type: ignore # https://twistedmatrix.com/trac/ticket/10135
118    helper_furl = None
119    helper_connected = False
120
121    def upload(self, uploadable, **kw):
122        d = uploadable.get_size()
123        d.addCallback(lambda size: uploadable.read(size))
124        def _got_data(datav):
125            data = b"".join(datav)
126            n = create_chk_filenode(data, self.all_contents)
127            ur = upload.UploadResults(file_size=len(data),
128                                      ciphertext_fetched=0,
129                                      preexisting_shares=0,
130                                      pushed_shares=10,
131                                      sharemap={},
132                                      servermap={},
133                                      timings={},
134                                      uri_extension_data={},
135                                      uri_extension_hash=b"fake",
136                                      verifycapstr=b"fakevcap")
137            ur.set_uri(n.get_uri())
138            return ur
139        d.addCallback(_got_data)
140        return d
141
142    def get_helper_info(self):
143        return (self.helper_furl, self.helper_connected)
144
145
146def build_one_ds():
147    ds = DownloadStatus(b"storage_index", 1234)
148    now = time.time()
149
150    serverA = StubServer(hashutil.tagged_hash(b"foo", b"serverid_a")[:20])
151    serverB = StubServer(hashutil.tagged_hash(b"foo", b"serverid_b")[:20])
152    storage_index = hashutil.storage_index_hash(b"SI")
153    e0 = ds.add_segment_request(0, now)
154    e0.activate(now+0.5)
155    e0.deliver(now+1, 0, 100, 0.5) # when, start,len, decodetime
156    e1 = ds.add_segment_request(1, now+2)
157    e1.error(now+3)
158    # two outstanding requests
159    e2 = ds.add_segment_request(2, now+4)
160    e3 = ds.add_segment_request(3, now+5)
161    del e2,e3 # hush pyflakes
162
163    # simulate a segment which gets delivered faster than a system clock tick (ticket #1166)
164    e = ds.add_segment_request(4, now)
165    e.activate(now)
166    e.deliver(now, 0, 140, 0.5)
167
168    e = ds.add_dyhb_request(serverA, now)
169    e.finished([1,2], now+1)
170    e = ds.add_dyhb_request(serverB, now+2) # left unfinished
171
172    e = ds.add_read_event(0, 120, now)
173    e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime
174    e.finished(now+1)
175    e = ds.add_read_event(120, 30, now+2) # left unfinished
176
177    e = ds.add_block_request(serverA, 1, 100, 20, now)
178    e.finished(20, now+1)
179    e = ds.add_block_request(serverB, 1, 120, 30, now+1) # left unfinished
180
181    # make sure that add_read_event() can come first too
182    ds1 = DownloadStatus(storage_index, 1234)
183    e = ds1.add_read_event(0, 120, now)
184    e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime
185    e.finished(now+1)
186
187    return ds
188
189class FakeHistory(object):
190    _all_upload_status = [upload.UploadStatus()]
191    _all_download_status = [build_one_ds()]
192    _all_mapupdate_statuses = [servermap.UpdateStatus()]
193    _all_publish_statuses = [publish.PublishStatus()]
194    _all_retrieve_statuses = [retrieve.RetrieveStatus()]
195
196    def list_all_upload_statuses(self):
197        return self._all_upload_status
198    def list_all_download_statuses(self):
199        return self._all_download_status
200    def list_all_mapupdate_statuses(self):
201        return self._all_mapupdate_statuses
202    def list_all_publish_statuses(self):
203        return self._all_publish_statuses
204    def list_all_retrieve_statuses(self):
205        return self._all_retrieve_statuses
206    def list_all_helper_statuses(self):
207        return []
208
209class FakeDisplayableServer(StubServer):  # type: ignore  # tahoe-lafs/ticket/3573
210    def __init__(self, serverid, nickname, connected,
211                 last_connect_time, last_loss_time, last_rx_time):
212        StubServer.__init__(self, serverid)
213        self.announcement = {"my-version": "tahoe-lafs-fake",
214                             "service-name": "storage",
215                             "nickname": nickname}
216        self.connected = connected
217        self.last_loss_time = last_loss_time
218        self.last_rx_time = last_rx_time
219        self.last_connect_time = last_connect_time
220
221    def on_status_changed(self, cb): # TODO: try to remove me
222        cb(self)
223    def is_connected(self): # TODO: remove me
224        return self.connected
225    def get_version(self):
226        return {
227            b"application-version": b"1.0"
228        }
229    def get_permutation_seed(self):
230        return b""
231    def get_announcement(self):
232        return self.announcement
233    def get_nickname(self):
234        return self.announcement["nickname"]
235    def get_available_space(self):
236        return 123456
237    def get_connection_status(self):
238        return ConnectionStatus(self.connected, "summary", {},
239                                self.last_connect_time, self.last_rx_time)
240
241class FakeBucketCounter(object):
242    def get_state(self):
243        return {"last-complete-bucket-count": 0}
244    def get_progress(self):
245        return {"estimated-time-per-cycle": 0,
246                "cycle-in-progress": False,
247                "remaining-wait-time": 0}
248
249class FakeLeaseChecker(object):
250    def __init__(self):
251        self.expiration_enabled = False
252        self.mode = "age"
253        self.override_lease_duration = None
254        self.sharetypes_to_expire = {}
255    def get_state(self):
256        return {"history": None}
257    def get_progress(self):
258        return {"estimated-time-per-cycle": 0,
259                "cycle-in-progress": False,
260                "remaining-wait-time": 0}
261
262class FakeStorageServer(service.MultiService):
263    name = 'storage' # type: ignore # https://twistedmatrix.com/trac/ticket/10135
264    def __init__(self, nodeid, nickname):
265        service.MultiService.__init__(self)
266        self.my_nodeid = nodeid
267        self.nickname = nickname
268        self.bucket_counter = FakeBucketCounter()
269        self.lease_checker = FakeLeaseChecker()
270    def get_stats(self):
271        return {"storage_server.accepting_immutable_shares": False}
272    def on_status_changed(self, cb):
273        cb(self)
274
275class FakeClient(_Client):  # type: ignore  # tahoe-lafs/ticket/3573
276    def __init__(self):
277        # don't upcall to Client.__init__, since we only want to initialize a
278        # minimal subset
279        service.MultiService.__init__(self)
280        self.all_contents = {}
281        self.nodeid = b"fake_nodeid"
282        self.nickname = u"fake_nickname \u263A"
283        self.introducer_furls = []
284        self.introducer_clients = []
285        self.stats_provider = FakeStatsProvider()
286        self._secret_holder = SecretHolder(b"lease secret", b"convergence secret")
287        self.helper = None
288        self.convergence = b"some random string"
289        self.storage_broker = StorageFarmBroker(
290            permute_peers=True,
291            tub_maker=None,
292            node_config=EMPTY_CLIENT_CONFIG,
293        )
294        # fake knowledge of another server
295        self.storage_broker.test_add_server(b"other_nodeid",
296            FakeDisplayableServer(
297                serverid=b"other_nodeid", nickname=u"other_nickname \u263B", connected = True,
298                last_connect_time = 10, last_loss_time = 20, last_rx_time = 30))
299        self.storage_broker.test_add_server(b"disconnected_nodeid",
300            FakeDisplayableServer(
301                serverid=b"disconnected_nodeid", nickname=u"disconnected_nickname \u263B", connected = False,
302                last_connect_time = None, last_loss_time = 25, last_rx_time = 35))
303        self.introducer_client = None
304        self.history = FakeHistory()
305        self.uploader = FakeUploader()
306        self.uploader.all_contents = self.all_contents
307        self.uploader.setServiceParent(self)
308        self.blacklist = None
309        self.nodemaker = FakeNodeMaker(None, self._secret_holder, None,
310                                       self.uploader, None,
311                                       None, None, None)
312        self.nodemaker.all_contents = self.all_contents
313        self.mutable_file_default = SDMF_VERSION
314        self.addService(FakeStorageServer(self.nodeid, self.nickname))
315
316    def get_long_nodeid(self):
317        return b"v0-nodeid"
318    def get_long_tubid(self):
319        return u"tubid"
320
321    def get_auth_token(self):
322        return b'a fake debug auth token'
323
324    def startService(self):
325        return service.MultiService.startService(self)
326    def stopService(self):
327        return service.MultiService.stopService(self)
328
329    MUTABLE_SIZELIMIT = FakeMutableFileNode.MUTABLE_SIZELIMIT
330
331class WebMixin(TimezoneMixin):
332    def setUp(self):
333        self.setTimezone('UTC-13:00')
334        self.s = FakeClient()
335        self.s.startService()
336        self.staticdir = self.mktemp()
337        self.clock = Clock()
338        self.fakeTime = 86460 # 1d 0h 1m 0s
339        tempdir = FilePath(self.mktemp())
340        tempdir.makedirs()
341        self.ws = webish.WebishServer(
342            self.s,
343            "0",
344            webish.anonymous_tempfile_factory(tempdir.path),
345            staticdir=self.staticdir,
346            clock=self.clock,
347            now_fn=lambda:self.fakeTime,
348        )
349        self.ws.setServiceParent(self.s)
350        self.webish_port = self.ws.getPortnum()
351        self.webish_url = self.ws.getURL()
352        assert self.webish_url.endswith("/")
353        self.webish_url = self.webish_url[:-1] # these tests add their own /
354
355        l = [ self.s.create_dirnode() for x in range(6) ]
356        d = defer.DeferredList(l)
357        def _then(res):
358            self.public_root = res[0][1]
359            assert interfaces.IDirectoryNode.providedBy(self.public_root), res
360            self.public_url = "/uri/" + str(self.public_root.get_uri(), "ascii")
361            self.private_root = res[1][1]
362
363            foo = res[2][1]
364            self._foo_node = foo
365            self._foo_uri = foo.get_uri()
366            self._foo_readonly_uri = foo.get_readonly_uri()
367            self._foo_verifycap = foo.get_verify_cap().to_string()
368            # NOTE: we ignore the deferred on all set_uri() calls, because we
369            # know the fake nodes do these synchronously
370            self.public_root.set_uri(u"foo", foo.get_uri(),
371                                     foo.get_readonly_uri())
372
373            self.BAR_CONTENTS, n, self._bar_txt_uri = self.makefile(0)
374            foo.set_uri(u"bar.txt", self._bar_txt_uri, self._bar_txt_uri)
375            self._bar_txt_verifycap = n.get_verify_cap().to_string()
376
377            # sdmf
378            # XXX: Do we ever use this?
379            self.BAZ_CONTENTS, n, self._baz_txt_uri, self._baz_txt_readonly_uri = self.makefile_mutable(0)
380
381            foo.set_uri(u"baz.txt", self._baz_txt_uri, self._baz_txt_readonly_uri)
382
383            # mdmf
384            self.QUUX_CONTENTS, n, self._quux_txt_uri, self._quux_txt_readonly_uri = self.makefile_mutable(0, mdmf=True)
385            assert self._quux_txt_uri.startswith(b"URI:MDMF")
386            foo.set_uri(u"quux.txt", self._quux_txt_uri, self._quux_txt_readonly_uri)
387
388            foo.set_uri(u"empty", res[3][1].get_uri(),
389                        res[3][1].get_readonly_uri())
390            sub_uri = res[4][1].get_uri()
391            self._sub_uri = sub_uri
392            foo.set_uri(u"sub", sub_uri, sub_uri)
393            sub = self.s.create_node_from_uri(sub_uri)
394            self._sub_node = sub
395
396            _ign, n, blocking_uri = self.makefile(1)
397            foo.set_uri(u"blockingfile", blocking_uri, blocking_uri)
398
399            # filenode to test for html encoding issues
400            self._htmlname_unicode = u"<&weirdly'named\"file>>>_<iframe />.txt"
401            self._htmlname_raw = self._htmlname_unicode.encode('utf-8')
402            self._htmlname_urlencoded = urlquote(self._htmlname_raw, '')
403            self.HTMLNAME_CONTENTS, n, self._htmlname_txt_uri = self.makefile(0)
404            foo.set_uri(self._htmlname_unicode, self._htmlname_txt_uri, self._htmlname_txt_uri)
405
406            unicode_filename = u"n\u00fc.txt" # n u-umlaut . t x t
407            # ok, unicode calls it LATIN SMALL LETTER U WITH DIAERESIS but I
408            # still think of it as an umlaut
409            foo.set_uri(unicode_filename, self._bar_txt_uri, self._bar_txt_uri)
410
411            self.SUBBAZ_CONTENTS, n, baz_file = self.makefile(2)
412            self._baz_file_uri = baz_file
413            sub.set_uri(u"baz.txt", baz_file, baz_file)
414
415            _ign, n, self._bad_file_uri = self.makefile(3)
416            # this uri should not be downloadable
417            del self.s.all_contents[self._bad_file_uri]
418
419            rodir = res[5][1]
420            self.public_root.set_uri(u"reedownlee", rodir.get_readonly_uri(),
421                                     rodir.get_readonly_uri())
422            rodir.set_uri(u"nor", baz_file, baz_file)
423
424            # public/
425            # public/foo/
426            # public/foo/bar.txt
427            # public/foo/baz.txt
428            # public/foo/quux.txt
429            # public/foo/blockingfile
430            # public/foo/<&weirdly'named\"file>>>_<iframe />.txt
431            # public/foo/empty/
432            # public/foo/sub/
433            # public/foo/sub/baz.txt
434            # public/reedownlee/
435            # public/reedownlee/nor
436            self.NEWFILE_CONTENTS = b"newfile contents\n"
437
438            return foo.get_metadata_for(u"bar.txt")
439        d.addCallback(_then)
440        def _got_metadata(metadata):
441            self._bar_txt_metadata = metadata
442        d.addCallback(_got_metadata)
443        return d
444
445    def get_all_contents(self):
446        return self.s.all_contents
447
448    def makefile(self, number):
449        contents = b"contents of file %d\n" % number
450        n = create_chk_filenode(contents, self.get_all_contents())
451        return contents, n, n.get_uri()
452
453    def makefile_mutable(self, number, mdmf=False):
454        contents = b"contents of mutable file %d\n" % number
455        n = create_mutable_filenode(contents, mdmf, self.s.all_contents)
456        return contents, n, n.get_uri(), n.get_readonly_uri()
457
458    def tearDown(self):
459        return self.s.stopService()
460
461    def failUnlessIsBarDotTxt(self, res):
462        self.failUnlessReallyEqual(res, self.BAR_CONTENTS, res)
463
464    def failUnlessIsQuuxDotTxt(self, res):
465        self.failUnlessReallyEqual(res, self.QUUX_CONTENTS, res)
466
467    def failUnlessIsBazDotTxt(self, res):
468        self.failUnlessReallyEqual(res, self.BAZ_CONTENTS, res)
469
470    def failUnlessIsSubBazDotTxt(self, res):
471        self.failUnlessReallyEqual(res, self.SUBBAZ_CONTENTS, res)
472
473    def failUnlessIsBarJSON(self, res):
474        data = json.loads(res)
475        self.failUnless(isinstance(data, list))
476        self.failUnlessEqual(data[0], "filenode")
477        self.failUnless(isinstance(data[1], dict))
478        self.failIf(data[1]["mutable"])
479        self.failIfIn("rw_uri", data[1]) # immutable
480        self.failUnlessReallyEqual(to_bytes(data[1]["ro_uri"]), self._bar_txt_uri)
481        self.failUnlessReallyEqual(to_bytes(data[1]["verify_uri"]), self._bar_txt_verifycap)
482        self.failUnlessReallyEqual(data[1]["size"], len(self.BAR_CONTENTS))
483
484    def failUnlessIsQuuxJSON(self, res, readonly=False):
485        data = json.loads(res)
486        self.failUnless(isinstance(data, list))
487        self.failUnlessEqual(data[0], "filenode")
488        self.failUnless(isinstance(data[1], dict))
489        metadata = data[1]
490        return self.failUnlessIsQuuxDotTxtMetadata(metadata, readonly)
491
492    def failUnlessIsQuuxDotTxtMetadata(self, metadata, readonly):
493        self.failUnless(metadata['mutable'])
494        if readonly:
495            self.failIfIn("rw_uri", metadata)
496        else:
497            self.failUnlessIn("rw_uri", metadata)
498            self.failUnlessEqual(metadata['rw_uri'], str(self._quux_txt_uri, "ascii"))
499        self.failUnlessIn("ro_uri", metadata)
500        self.failUnlessEqual(metadata['ro_uri'], str(self._quux_txt_readonly_uri, "ascii"))
501        self.failUnlessReallyEqual(metadata['size'], len(self.QUUX_CONTENTS))
502
503    def failUnlessIsFooJSON(self, res):
504        data = json.loads(res)
505        self.failUnless(isinstance(data, list))
506        self.failUnlessEqual(data[0], "dirnode", res)
507        self.failUnless(isinstance(data[1], dict))
508        self.failUnless(data[1]["mutable"])
509        self.failUnlessIn("rw_uri", data[1]) # mutable
510        self.failUnlessReallyEqual(to_bytes(data[1]["rw_uri"]), self._foo_uri)
511        self.failUnlessReallyEqual(to_bytes(data[1]["ro_uri"]), self._foo_readonly_uri)
512        self.failUnlessReallyEqual(to_bytes(data[1]["verify_uri"]), self._foo_verifycap)
513
514        kidnames = sorted([str(n) for n in data[1]["children"]])
515        self.failUnlessEqual(kidnames,
516                             [self._htmlname_unicode, u"bar.txt", u"baz.txt",
517                              u"blockingfile", u"empty", u"n\u00fc.txt", u"quux.txt", u"sub"])
518        kids = dict( [(str(name),value)
519                      for (name,value)
520                      in list(data[1]["children"].items())] )
521        self.failUnlessEqual(kids[u"sub"][0], "dirnode")
522        self.failUnlessIn("metadata", kids[u"sub"][1])
523        self.failUnlessIn("tahoe", kids[u"sub"][1]["metadata"])
524        tahoe_md = kids[u"sub"][1]["metadata"]["tahoe"]
525        self.failUnlessIn("linkcrtime", tahoe_md)
526        self.failUnlessIn("linkmotime", tahoe_md)
527        self.failUnlessEqual(kids[u"bar.txt"][0], "filenode")
528        self.failUnlessReallyEqual(kids[u"bar.txt"][1]["size"], len(self.BAR_CONTENTS))
529        self.failUnlessReallyEqual(to_bytes(kids[u"bar.txt"][1]["ro_uri"]), self._bar_txt_uri)
530        self.failUnlessReallyEqual(to_bytes(kids[u"bar.txt"][1]["verify_uri"]),
531                                   self._bar_txt_verifycap)
532        self.failUnlessIn("metadata", kids[u"bar.txt"][1])
533        self.failUnlessIn("tahoe", kids[u"bar.txt"][1]["metadata"])
534        self.failUnlessReallyEqual(kids[u"bar.txt"][1]["metadata"]["tahoe"]["linkcrtime"],
535                                   self._bar_txt_metadata["tahoe"]["linkcrtime"])
536        self.failUnlessReallyEqual(to_bytes(kids[u"n\u00fc.txt"][1]["ro_uri"]),
537                                   self._bar_txt_uri)
538        self.failUnlessIn("quux.txt", kids)
539        self.failUnlessReallyEqual(to_bytes(kids[u"quux.txt"][1]["rw_uri"]),
540                                   self._quux_txt_uri)
541        self.failUnlessReallyEqual(to_bytes(kids[u"quux.txt"][1]["ro_uri"]),
542                                   self._quux_txt_readonly_uri)
543
544    @inlineCallbacks
545    def GET(self, urlpath, followRedirect=False, return_response=False,
546            **kwargs):
547        # if return_response=True, this fires with (data, statuscode,
548        # respheaders) instead of just data.
549
550        # treq can accept unicode URLs, unlike the old client.getPage
551        url = self.webish_url + urlpath
552        response = yield treq.request("get", url, persistent=False,
553                                      allow_redirects=followRedirect,
554                                      **kwargs)
555        data = yield response.content()
556        if return_response:
557            # we emulate the old HTTPClientGetFactory-based response, which
558            # wanted a tuple of (bytestring of data, bytestring of response
559            # code like "200" or "404", and a
560            # twisted.web.http_headers.Headers instance). Fortunately treq's
561            # response.headers has one.
562            returnValue( (data, str(response.code), response.headers) )
563        if 400 <= response.code < 600:
564            raise Error(response.code, response=data)
565        returnValue(data)
566
567    @inlineCallbacks
568    def HEAD(self, urlpath, return_response=False, headers=None):
569        if headers is None:
570            headers = {}
571        url = self.webish_url + urlpath
572        response = yield treq.request("head", url, persistent=False,
573                                      headers=headers)
574        if 400 <= response.code < 600:
575            raise Error(response.code, response="")
576        returnValue( ("", response.code, response.headers) )
577
578    def PUT(self, urlpath, data, headers=None):
579        if headers is None:
580            headers = {}
581        url = self.webish_url + urlpath
582        return do_http("put", url, data=data, headers=headers)
583
584    def DELETE(self, urlpath):
585        url = self.webish_url + urlpath
586        return do_http("delete", url)
587
588    def build_form(self, **fields):
589        sepbase = b"boogabooga"
590        sep = b"--" + sepbase
591        form = []
592        form.append(sep)
593        form.append(b'Content-Disposition: form-data; name="_charset"')
594        form.append(b'')
595        form.append(b'UTF-8')
596        form.append(sep)
597        for name, value in list(fields.items()):
598            if isinstance(name, str):
599                name = name.encode("utf-8")
600            if isinstance(value, tuple):
601                filename, value = value
602                if isinstance(filename, str):
603                    filename = filename.encode("utf-8")
604                form.append(b'Content-Disposition: form-data; name="%s"; '
605                            b'filename="%s"' % (name, filename))
606            else:
607                form.append(b'Content-Disposition: form-data; name="%s"' % name)
608            form.append(b'')
609            if isinstance(value, str):
610                value = value.encode("utf-8")
611            form.append(value)
612            form.append(sep)
613        form[-1] += b"--"
614        body = b""
615        headers = {}
616        if fields:
617            body = b"\r\n".join(form) + b"\r\n"
618            headers["content-type"] = "multipart/form-data; boundary=%s" % str(sepbase, "utf-8")
619        return (body, headers)
620
621    def POST(self, urlpath, **fields):
622        body, headers = self.build_form(**fields)
623        return self.POST2(urlpath, body, headers)
624
625    def POST2(self, urlpath, body="", headers=None, followRedirect=False):
626        if headers is None:
627            headers = {}
628        url = self.webish_url + urlpath
629        if isinstance(body, str):
630            body = body.encode("utf-8")
631        return do_http("POST", url, allow_redirects=followRedirect,
632                       headers=headers, data=body)
633
634    def shouldFail(self, res, expected_failure, which,
635                   substring=None, response_substring=None):
636        if isinstance(res, failure.Failure):
637            res.trap(expected_failure)
638            if substring:
639                self.failUnlessIn(substring, str(res), which)
640            if response_substring:
641                self.failUnlessIn(response_substring, res.value.response, which)
642        else:
643            self.fail("%r was supposed to raise %s, not get %r" %
644                      (which, expected_failure, res))
645
646    def shouldFail2(self, expected_failure, which, substring,
647                    response_substring,
648                    callable, *args, **kwargs):
649        assert substring is None or isinstance(substring, str)
650        assert response_substring is None or isinstance(response_substring, str)
651        d = defer.maybeDeferred(callable, *args, **kwargs)
652        def done(res):
653            if isinstance(res, failure.Failure):
654                res.trap(expected_failure)
655                if substring:
656                    self.failUnlessIn(substring, str(res),
657                                      "%r not in %r (response is %r) for test %r" % \
658                                      (substring, str(res),
659                                       getattr(res.value, "response", ""),
660                                       which))
661                if response_substring:
662                    response = res.value.response
663                    if isinstance(response, bytes):
664                        response = str(response, "utf-8")
665                    self.failUnlessIn(response_substring, response,
666                                      "%r not in %r for test %r" % \
667                                      (response_substring, res.value.response,
668                                       which))
669            else:
670                self.fail("%r was supposed to raise %s, not get %r" %
671                          (which, expected_failure, res))
672        d.addBoth(done)
673        return d
674
675    def should404(self, res, which):
676        if isinstance(res, failure.Failure):
677            res.trap(error.Error)
678            self.failUnlessReallyEqual(res.value.status, b"404")
679        else:
680            self.fail("%s was supposed to Error(404), not get '%s'" %
681                      (which, res))
682
683    def should302(self, res, which):
684        if isinstance(res, failure.Failure):
685            res.trap(error.Error)
686            self.failUnlessReallyEqual(res.value.status, b"302")
687        else:
688            self.fail("%s was supposed to Error(302), not get '%s'" %
689                      (which, res))
690
691
692class MultiFormatResourceTests(TrialTestCase):
693    """
694    Tests for ``MultiFormatResource``.
695    """
696    def render(self, resource, **queryargs):
697        # Query arguments in real twisted.web requests have byte keys.
698        queryargs = {k.encode("utf-8"): v for (k, v) in list(queryargs.items())}
699        return self.successResultOf(render(resource, queryargs))
700
701    def resource(self):
702        """
703        Create and return an instance of a ``MultiFormatResource`` subclass
704        with a default HTML format, and two custom formats: ``a`` and ``b``.
705        """
706        class Content(MultiFormatResource):
707
708            def render_HTML(self, req):
709                return b"html"
710
711            def render_A(self, req):
712                return b"a"
713
714            def render_B(self, req):
715                return b"b"
716
717        return Content()
718
719
720    def test_select_format(self):
721        """
722        The ``formatArgument`` attribute of a ``MultiFormatResource`` subclass
723        identifies the query argument which selects the result format.
724        """
725        resource = self.resource()
726        resource.formatArgument = "foo"
727        self.assertEqual(b"a", self.render(resource, foo=[b"a"]))
728
729
730    def test_default_format_argument(self):
731        """
732        If a ``MultiFormatResource`` subclass does not set ``formatArgument``
733        then the ``t`` argument is used.
734        """
735        resource = self.resource()
736        self.assertEqual(b"a", self.render(resource, t=[b"a"]))
737
738
739    def test_no_format(self):
740        """
741        If no value is given for the format argument and no default format has
742        been defined, the base rendering behavior is used (``render_HTML``).
743        """
744        resource = self.resource()
745        self.assertEqual(b"html", self.render(resource))
746
747
748    def test_default_format(self):
749        """
750        If no value is given for the format argument and the ``MultiFormatResource``
751        subclass defines a ``formatDefault``, that value is used as the format
752        to render.
753        """
754        resource = self.resource()
755        resource.formatDefault = "b"
756        self.assertEqual(b"b", self.render(resource))
757
758
759    def test_explicit_none_format_renderer(self):
760        """
761        If a format is selected which has a renderer set to ``None``, the base
762        rendering behavior is used (``render_HTML``).
763        """
764        resource = self.resource()
765        resource.render_FOO = None
766        self.assertEqual(b"html", self.render(resource, t=[b"foo"]))
767
768
769    def test_unknown_format(self):
770        """
771        If a format is selected for which there is no renderer, an error is
772        returned.
773        """
774        resource = self.resource()
775        response_body = self.render(resource, t=[b"foo"])
776        self.assertIn(
777            b"<title>400 - Bad Format</title>", response_body,
778        )
779        self.assertIn(
780            b"Unknown t value:", response_body,
781        )
782        self.assertIn(
783            b"'foo'", response_body,
784        )
785
786
787class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixin, TrialTestCase):
788    maxDiff = None
789
790    def test_create(self):
791        pass
792
793    def _assertResponseHeaders(self, name, values):
794        """
795        Assert that the resource at **/** is served with a response header named
796        ``name`` and values ``values``.
797
798        :param bytes name: The name of the header item to check.
799        :param [bytes] values: The expected values.
800
801        :return Deferred: A Deferred that fires successfully if the expected
802            header item is found and which fails otherwise.
803        """
804        d = self.GET("/", return_response=True)
805        def responded(result):
806            _, _, headers = result
807            self.assertEqual(
808                values,
809                headers.getRawHeaders(name),
810            )
811        d.addCallback(responded)
812        return d
813
814    def test_frame_options(self):
815        """
816        Pages deny the ability to be loaded in frames.
817        """
818        # It should be all pages but we only demonstrate it for / with this test.
819        return self._assertResponseHeaders(b"X-Frame-Options", [b"DENY"])
820
821    def test_referrer_policy(self):
822        """
823        Pages set a **no-referrer** policy.
824        """
825        # It should be all pages but we only demonstrate it for / with this test.
826        return self._assertResponseHeaders(b"Referrer-Policy", [b"no-referrer"])
827
828    def test_welcome_json(self):
829        """
830        There is a JSON version of the welcome page which can be selected with the
831        ``t`` query argument.
832        """
833        d = self.GET("/?t=json")
834        def _check(res):
835            """
836            Check that the results are correct.
837            We can't depend on the order of servers in the output
838            """
839            decoded = json.loads(res)
840            self.assertEqual(decoded['introducers'], {u'statuses': []})
841            actual_servers = decoded[u"servers"]
842            self.assertEquals(len(actual_servers), 2)
843            self.assertIn(
844                {
845                    u"nodeid": u'other_nodeid',
846                    u'available_space': 123456,
847                    u'connection_status': u'summary',
848                    u'last_received_data': 30,
849                    u'nickname': u'other_nickname \u263b',
850                    u'version': u'1.0',
851                },
852                actual_servers
853            )
854            self.assertIn(
855                {
856                    u"nodeid": u'disconnected_nodeid',
857                    u'available_space': 123456,
858                    u'connection_status': u'summary',
859                    u'last_received_data': 35,
860                    u'nickname': u'disconnected_nickname \u263b',
861                    u'version': u'1.0',
862                },
863                actual_servers
864            )
865
866        d.addCallback(_check)
867        return d
868
869    def test_introducer_status(self):
870        class MockIntroducerClient(object):
871            def __init__(self, connected):
872                self.connected = connected
873            def connection_status(self):
874                return ConnectionStatus(self.connected, "summary", {}, 0, 0)
875
876        d = defer.succeed(None)
877
878        # introducer not connected, unguessable furl
879        def _set_introducer_not_connected_unguessable(ign):
880            self.s.introducer_furls = [ "pb://someIntroducer/secret" ]
881            self.s.introducer_clients = [ MockIntroducerClient(False) ]
882            return self.GET("/")
883        d.addCallback(_set_introducer_not_connected_unguessable)
884        def _check_introducer_not_connected_unguessable(res):
885            soup = BeautifulSoup(res, 'html5lib')
886            self.failIfIn(b'pb://someIntroducer/secret', res)
887            assert_soup_has_tag_with_attributes(
888                self, soup, u"img",
889                {u"alt": u"Disconnected", u"src": u"img/connected-no.png"}
890            )
891            assert_soup_has_tag_with_content(
892                self, soup, u"div",
893                u"No introducers connected"
894            )
895        d.addCallback(_check_introducer_not_connected_unguessable)
896
897        # introducer connected, unguessable furl
898        def _set_introducer_connected_unguessable(ign):
899            self.s.introducer_furls = [ "pb://someIntroducer/secret" ]
900            self.s.introducer_clients = [ MockIntroducerClient(True) ]
901            return self.GET("/")
902        d.addCallback(_set_introducer_connected_unguessable)
903        def _check_introducer_connected_unguessable(res):
904            soup = BeautifulSoup(res, 'html5lib')
905            assert_soup_has_tag_with_attributes_and_content(
906                self, soup, u"div",
907                u"summary",
908                { u"class": u"connection-status", u"title": u"(no other hints)" }
909            )
910            self.failIfIn(b'pb://someIntroducer/secret', res)
911            assert_soup_has_tag_with_attributes(
912                self, soup, u"img",
913                { u"alt": u"Connected", u"src": u"img/connected-yes.png" }
914            )
915            assert_soup_has_tag_with_content(
916                self, soup, u"div",
917                u"1 introducer connected"
918            )
919        d.addCallback(_check_introducer_connected_unguessable)
920
921        # introducer connected, guessable furl
922        def _set_introducer_connected_guessable(ign):
923            self.s.introducer_furls = [ "pb://someIntroducer/introducer" ]
924            self.s.introducer_clients = [ MockIntroducerClient(True) ]
925            return self.GET("/")
926        d.addCallback(_set_introducer_connected_guessable)
927        def _check_introducer_connected_guessable(res):
928            soup = BeautifulSoup(res, 'html5lib')
929            assert_soup_has_tag_with_attributes_and_content(
930                self, soup, u"div",
931                u"summary",
932                { u"class": u"connection-status", u"title": u"(no other hints)" }
933            )
934            assert_soup_has_tag_with_attributes(
935                self, soup, u"img",
936                { u"src": u"img/connected-yes.png", u"alt": u"Connected" }
937            )
938            assert_soup_has_tag_with_content(
939                self, soup, u"div",
940                u"1 introducer connected"
941            )
942        d.addCallback(_check_introducer_connected_guessable)
943        return d
944
945    def test_helper_status(self):
946        d = defer.succeed(None)
947
948        # set helper furl to None
949        def _set_no_helper(ign):
950            self.s.uploader.helper_furl = None
951            return self.GET("/")
952        d.addCallback(_set_no_helper)
953        def _check_no_helper(res):
954            soup = BeautifulSoup(res, 'html5lib')
955            assert_soup_has_tag_with_attributes(
956                self, soup, u"img",
957                { u"src": u"img/connected-not-configured.png", u"alt": u"Not Configured" }
958            )
959        d.addCallback(_check_no_helper)
960
961        # enable helper, not connected
962        def _set_helper_not_connected(ign):
963            self.s.uploader.helper_furl = "pb://someHelper/secret"
964            self.s.uploader.helper_connected = False
965            return self.GET("/")
966        d.addCallback(_set_helper_not_connected)
967        def _check_helper_not_connected(res):
968            soup = BeautifulSoup(res, 'html5lib')
969            assert_soup_has_tag_with_attributes_and_content(
970                self, soup, u"div",
971                u"pb://someHelper/[censored]",
972                { u"class": u"furl" }
973            )
974            self.failIfIn(b'pb://someHelper/secret', res)
975            assert_soup_has_tag_with_attributes(
976                self, soup, u"img",
977                { u"src": u"img/connected-no.png", u"alt": u"Disconnected" }
978            )
979        d.addCallback(_check_helper_not_connected)
980
981        # enable helper, connected
982        def _set_helper_connected(ign):
983            self.s.uploader.helper_furl = "pb://someHelper/secret"
984            self.s.uploader.helper_connected = True
985            return self.GET("/")
986        d.addCallback(_set_helper_connected)
987        def _check_helper_connected(res):
988            soup = BeautifulSoup(res, 'html5lib')
989            assert_soup_has_tag_with_attributes_and_content(
990                self, soup, u"div",
991                u"pb://someHelper/[censored]",
992                { u"class": u"furl" }
993            )
994            self.failIfIn(b'pb://someHelper/secret', res)
995            assert_soup_has_tag_with_attributes(
996                self, soup, u"img",
997                { u"src": u"img/connected-yes.png", "alt": u"Connected" }
998            )
999        d.addCallback(_check_helper_connected)
1000        return d
1001
1002    def test_storage(self):
1003        d = self.GET("/storage")
1004        def _check(res):
1005            soup = BeautifulSoup(res, 'html5lib')
1006            assert_soup_has_text(self, soup, 'Storage Server Status')
1007            assert_soup_has_favicon(self, soup)
1008            res_u = res.decode('utf-8')
1009            self.failUnlessIn(u'<li>Server Nickname: <span class="nickname mine">fake_nickname \u263A</span></li>', res_u)
1010        d.addCallback(_check)
1011        return d
1012
1013    def test_status(self):
1014        h = self.s.get_history()
1015        dl_num = h.list_all_download_statuses()[0].get_counter()
1016        ul_num = h.list_all_upload_statuses()[0].get_counter()
1017        mu_num = h.list_all_mapupdate_statuses()[0].get_counter()
1018        pub_num = h.list_all_publish_statuses()[0].get_counter()
1019        ret_num = h.list_all_retrieve_statuses()[0].get_counter()
1020        d = self.GET("/status", followRedirect=True)
1021        def _check(res):
1022            res = str(res, "utf-8")
1023            self.failUnlessIn('Recent and Active Operations', res)
1024            self.failUnlessIn('"/status/down-%d"' % dl_num, res)
1025            self.failUnlessIn('"/status/up-%d"' % ul_num, res)
1026            self.failUnlessIn('"/status/mapupdate-%d"' % mu_num, res)
1027            self.failUnlessIn('"/status/publish-%d"' % pub_num, res)
1028            self.failUnlessIn('"/status/retrieve-%d"' % ret_num, res)
1029        d.addCallback(_check)
1030        d.addCallback(lambda res: self.GET("/status/?t=json"))
1031        def _check_json(res):
1032            data = json.loads(res)
1033            self.failUnless(isinstance(data, dict))
1034            #active = data["active"]
1035            # TODO: test more. We need a way to fake an active operation
1036            # here.
1037        d.addCallback(_check_json)
1038
1039        d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num))
1040        def _check_dl(res):
1041            self.failUnlessIn(b"File Download Status", res)
1042        d.addCallback(_check_dl)
1043        d.addCallback(lambda res: self.GET("/status/down-%d/event_json" % dl_num))
1044        def _check_dl_json(res):
1045            data = json.loads(res)
1046            self.failUnless(isinstance(data, dict))
1047            self.failUnlessIn("read", data)
1048            self.failUnlessEqual(data["read"][0]["length"], 120)
1049            self.failUnlessEqual(data["segment"][0]["segment_length"], 100)
1050            self.failUnlessEqual(data["segment"][2]["segment_number"], 2)
1051            self.failUnlessEqual(data["segment"][2]["finish_time"], None)
1052            phwr_id = str(base32.b2a(hashutil.tagged_hash(b"foo", b"serverid_a")[:20]), "ascii")
1053            cmpu_id = str(base32.b2a(hashutil.tagged_hash(b"foo", b"serverid_b")[:20]), "ascii")
1054            # serverids[] keys are strings, since that's what JSON does, but
1055            # we'd really like them to be ints
1056            self.failUnlessEqual(data["serverids"]["0"], "phwrsjte")
1057            self.failUnless("1" in data["serverids"],
1058                            str(data["serverids"]))
1059            self.failUnlessEqual(data["serverids"]["1"], "cmpuvkjm",
1060                                 str(data["serverids"]))
1061            self.failUnlessEqual(data["server_info"][phwr_id]["short"],
1062                                 "phwrsjte")
1063            self.failUnlessEqual(data["server_info"][cmpu_id]["short"],
1064                                 "cmpuvkjm")
1065            self.failUnlessIn("dyhb", data)
1066            self.failUnlessIn("misc", data)
1067        d.addCallback(_check_dl_json)
1068        d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num))
1069        def _check_ul(res):
1070            self.failUnlessIn(b"File Upload Status", res)
1071        d.addCallback(_check_ul)
1072        d.addCallback(lambda res: self.GET("/status/mapupdate-%d" % mu_num))
1073        def _check_mapupdate(res):
1074            self.failUnlessIn(b"Mutable File Servermap Update Status", res)
1075        d.addCallback(_check_mapupdate)
1076        d.addCallback(lambda res: self.GET("/status/publish-%d" % pub_num))
1077        def _check_publish(res):
1078            self.failUnlessIn(b"Mutable File Publish Status", res)
1079        d.addCallback(_check_publish)
1080        d.addCallback(lambda res: self.GET("/status/retrieve-%d" % ret_num))
1081        def _check_retrieve(res):
1082            self.failUnlessIn(b"Mutable File Retrieve Status", res)
1083        d.addCallback(_check_retrieve)
1084
1085        return d
1086
1087    def test_status_path_nodash_error(self):
1088        """
1089        Expect an error, because path is expected to be of the form
1090        "/status/{up,down,..}-%number", with a hyphen.
1091        """
1092        return self.shouldFail2(error.Error,
1093                                "test_status_path_nodash",
1094                                "400 Bad Request",
1095                                "no '-' in 'nodash'",
1096                                self.GET,
1097                                "/status/nodash")
1098
1099    def test_status_page_contains_links(self):
1100        """
1101        Check that the rendered `/status` page contains all the
1102        expected links.
1103        """
1104        def _check_status_page_links(response):
1105            (body, status, _) = response
1106
1107            self.failUnlessReallyEqual(int(status), 200)
1108
1109            soup = BeautifulSoup(body, 'html5lib')
1110            h = self.s.get_history()
1111
1112            # Check for `<a href="/status/retrieve-0">Not started</a>`
1113            ret_num = h.list_all_retrieve_statuses()[0].get_counter()
1114            assert_soup_has_tag_with_attributes_and_content(
1115                self, soup, u"a",
1116                u"Not started",
1117                {u"href": u"/status/retrieve-{}".format(ret_num)}
1118            )
1119
1120            # Check for `<a href="/status/publish-0">Not started</a></td>`
1121            pub_num = h.list_all_publish_statuses()[0].get_counter()
1122            assert_soup_has_tag_with_attributes_and_content(
1123                self, soup, u"a",
1124                u"Not started",
1125                {u"href": u"/status/publish-{}".format(pub_num)}
1126            )
1127
1128            # Check for `<a href="/status/mapupdate-0">Not started</a>`
1129            mu_num = h.list_all_mapupdate_statuses()[0].get_counter()
1130            assert_soup_has_tag_with_attributes_and_content(
1131                self, soup, u"a",
1132                u"Not started",
1133                {u"href": u"/status/mapupdate-{}".format(mu_num)}
1134            )
1135
1136            # Check for `<a href="/status/down-0">fetching segments
1137            # 2,3; errors on segment 1</a>`: see build_one_ds() above.
1138            dl_num = h.list_all_download_statuses()[0].get_counter()
1139            assert_soup_has_tag_with_attributes_and_content(
1140                self, soup, u"a",
1141                u"fetching segments 2,3; errors on segment 1",
1142                {u"href": u"/status/down-{}".format(dl_num)}
1143            )
1144
1145            # Check for `<a href="/status/up-0">Not started</a>`
1146            ul_num = h.list_all_upload_statuses()[0].get_counter()
1147            assert_soup_has_tag_with_attributes_and_content(
1148                self, soup, u"a",
1149                u"Not started",
1150                {u"href": u"/status/up-{}".format(ul_num)}
1151            )
1152
1153        d = self.GET("/status", return_response=True)
1154        d.addCallback(_check_status_page_links)
1155        return d
1156
1157    def test_status_path_trailing_slashes(self):
1158        """
1159        Test that both `GET /status` and `GET /status/` are treated
1160        alike, but reject any additional trailing slashes and other
1161        non-existent child nodes.
1162        """
1163        def _check_status(response):
1164            (body, status, _) = response
1165
1166            self.failUnlessReallyEqual(int(status), 200)
1167
1168            soup = BeautifulSoup(body, 'html5lib')
1169            assert_soup_has_favicon(self, soup)
1170            assert_soup_has_tag_with_content(
1171                self, soup, u"title",
1172                u"Tahoe-LAFS - Recent and Active Operations"
1173            )
1174
1175        d = self.GET("/status", return_response=True)
1176        d.addCallback(_check_status)
1177
1178        d = self.GET("/status/", return_response=True)
1179        d.addCallback(_check_status)
1180
1181        d =  self.shouldFail2(error.Error,
1182                              "test_status_path_trailing_slashes",
1183                              "400 Bad Request",
1184                              "no '-' in ''",
1185                              self.GET,
1186                              "/status//")
1187
1188        d =  self.shouldFail2(error.Error,
1189                              "test_status_path_trailing_slashes",
1190                              "400 Bad Request",
1191                              "no '-' in ''",
1192                              self.GET,
1193                              "/status////////")
1194
1195        return d
1196
1197    def test_status_path_404_error(self):
1198        """
1199        Looking for non-existent statuses under child paths should
1200        exercises all the iterators in web.status.Status.getChild().
1201
1202        The test suite (hopefully!) would not have done any setup for
1203        a very large number of statuses at this point, now or in the
1204        future, so these all should always return 404.
1205        """
1206        d = self.GET("/status/up-9999999")
1207        d.addBoth(self.should404, "test_status_path_404_error (up)")
1208
1209        d = self.GET("/status/down-9999999")
1210        d.addBoth(self.should404, "test_status_path_404_error (down)")
1211
1212        d = self.GET("/status/mapupdate-9999999")
1213        d.addBoth(self.should404, "test_status_path_404_error (mapupdate)")
1214
1215        d = self.GET("/status/publish-9999999")
1216        d.addBoth(self.should404, "test_status_path_404_error (publish)")
1217
1218        d = self.GET("/status/retrieve-9999999")
1219        d.addBoth(self.should404, "test_status_path_404_error (retrieve)")
1220
1221        return d
1222
1223    def _check_status_subpath_result(self, result, expected_title):
1224        """
1225        Helper to verify that results of "GET /status/up-0" and
1226        similar are as expected.
1227        """
1228        body, status, _ = result
1229        self.failUnlessReallyEqual(int(status), 200)
1230        soup = BeautifulSoup(body, 'html5lib')
1231        assert_soup_has_favicon(self, soup)
1232        assert_soup_has_tag_with_content(
1233            self, soup, u"title", expected_title
1234        )
1235
1236    def test_status_up_subpath(self):
1237        """
1238        See that "GET /status/up-0" works.
1239        """
1240        h = self.s.get_history()
1241        ul_num = h.list_all_upload_statuses()[0].get_counter()
1242        d = self.GET("/status/up-{}".format(ul_num), return_response=True)
1243        d.addCallback(self._check_status_subpath_result,
1244                      u"Tahoe-LAFS - File Upload Status")
1245        return d
1246
1247    def test_status_down_subpath(self):
1248        """
1249        See that "GET /status/down-0" works.
1250        """
1251        h = self.s.get_history()
1252        dl_num = h.list_all_download_statuses()[0].get_counter()
1253        d = self.GET("/status/down-{}".format(dl_num), return_response=True)
1254        d.addCallback(self._check_status_subpath_result,
1255                      u"Tahoe-LAFS - File Download Status")
1256        return d
1257
1258    def test_status_mapupdate_subpath(self):
1259        """
1260        See that "GET /status/mapupdate-0" works.
1261        """
1262        h = self.s.get_history()
1263        mu_num = h.list_all_mapupdate_statuses()[0].get_counter()
1264        d = self.GET("/status/mapupdate-{}".format(mu_num), return_response=True)
1265        d.addCallback(self._check_status_subpath_result,
1266                      u"Tahoe-LAFS - Mutable File Servermap Update Status")
1267        return d
1268
1269    def test_status_publish_subpath(self):
1270        """
1271        See that "GET /status/publish-0" works.
1272        """
1273        h = self.s.get_history()
1274        pub_num = h.list_all_publish_statuses()[0].get_counter()
1275        d = self.GET("/status/publish-{}".format(pub_num), return_response=True)
1276        d.addCallback(self._check_status_subpath_result,
1277                      u"Tahoe-LAFS - Mutable File Publish Status")
1278        return d
1279
1280    def test_status_retrieve_subpath(self):
1281        """
1282        See that "GET /status/retrieve-0" works.
1283        """
1284        h = self.s.get_history()
1285        ret_num = h.list_all_retrieve_statuses()[0].get_counter()
1286        d = self.GET("/status/retrieve-{}".format(ret_num), return_response=True)
1287        d.addCallback(self._check_status_subpath_result,
1288                      u"Tahoe-LAFS - Mutable File Retrieve Status")
1289        return d
1290
1291    def test_GET_FILEURL(self):
1292        d = self.GET(self.public_url + "/foo/bar.txt")
1293        d.addCallback(self.failUnlessIsBarDotTxt)
1294        return d
1295
1296    def test_GET_FILEURL_range(self):
1297        headers = {"range": "bytes=1-10"}
1298        d = self.GET(self.public_url + "/foo/bar.txt", headers=headers,
1299                     return_response=True)
1300        def _got(res_and_status_and_headers):
1301            (res, status, headers) = res_and_status_and_headers
1302            self.failUnlessReallyEqual(int(status), 206)
1303            self.failUnless(headers.hasHeader("content-range"))
1304            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1305                                       "bytes 1-10/%d" % len(self.BAR_CONTENTS))
1306            self.failUnlessReallyEqual(res, self.BAR_CONTENTS[1:11])
1307        d.addCallback(_got)
1308        return d
1309
1310    def test_GET_FILEURL_partial_range(self):
1311        headers = {"range": "bytes=5-"}
1312        length  = len(self.BAR_CONTENTS)
1313        d = self.GET(self.public_url + "/foo/bar.txt", headers=headers,
1314                     return_response=True)
1315        def _got(res_and_status_and_headers):
1316            (res, status, headers) = res_and_status_and_headers
1317            self.failUnlessReallyEqual(int(status), 206)
1318            self.failUnless(headers.hasHeader("content-range"))
1319            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1320                                       "bytes 5-%d/%d" % (length-1, length))
1321            self.failUnlessReallyEqual(res, self.BAR_CONTENTS[5:])
1322        d.addCallback(_got)
1323        return d
1324
1325    def test_GET_FILEURL_partial_end_range(self):
1326        headers = {"range": "bytes=-5"}
1327        length  = len(self.BAR_CONTENTS)
1328        d = self.GET(self.public_url + "/foo/bar.txt", headers=headers,
1329                     return_response=True)
1330        def _got(res_and_status_and_headers):
1331            (res, status, headers) = res_and_status_and_headers
1332            self.failUnlessReallyEqual(int(status), 206)
1333            self.failUnless(headers.hasHeader("content-range"))
1334            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1335                                       "bytes %d-%d/%d" % (length-5, length-1, length))
1336            self.failUnlessReallyEqual(res, self.BAR_CONTENTS[-5:])
1337        d.addCallback(_got)
1338        return d
1339
1340    def test_GET_FILEURL_partial_range_overrun(self):
1341        headers = {"range": "bytes=100-200"}
1342        d = self.shouldFail2(error.Error, "test_GET_FILEURL_range_overrun",
1343                             "416 Requested Range not satisfiable",
1344                             "First beyond end of file",
1345                             self.GET, self.public_url + "/foo/bar.txt",
1346                             headers=headers)
1347        return d
1348
1349    def test_HEAD_FILEURL_range(self):
1350        headers = {"range": "bytes=1-10"}
1351        d = self.HEAD(self.public_url + "/foo/bar.txt", headers=headers,
1352                     return_response=True)
1353        def _got(res_and_status_and_headers):
1354            (res, status, headers) = res_and_status_and_headers
1355            self.failUnlessReallyEqual(res, "")
1356            self.failUnlessReallyEqual(int(status), 206)
1357            self.failUnless(headers.hasHeader("content-range"))
1358            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1359                                       "bytes 1-10/%d" % len(self.BAR_CONTENTS))
1360        d.addCallback(_got)
1361        return d
1362
1363    def test_HEAD_FILEURL_partial_range(self):
1364        headers = {"range": "bytes=5-"}
1365        length  = len(self.BAR_CONTENTS)
1366        d = self.HEAD(self.public_url + "/foo/bar.txt", headers=headers,
1367                     return_response=True)
1368        def _got(res_and_status_and_headers):
1369            (res, status, headers) = res_and_status_and_headers
1370            self.failUnlessReallyEqual(int(status), 206)
1371            self.failUnless(headers.hasHeader("content-range"))
1372            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1373                                       "bytes 5-%d/%d" % (length-1, length))
1374        d.addCallback(_got)
1375        return d
1376
1377    def test_HEAD_FILEURL_partial_end_range(self):
1378        headers = {"range": "bytes=-5"}
1379        length  = len(self.BAR_CONTENTS)
1380        d = self.HEAD(self.public_url + "/foo/bar.txt", headers=headers,
1381                     return_response=True)
1382        def _got(res_and_status_and_headers):
1383            (res, status, headers) = res_and_status_and_headers
1384            self.failUnlessReallyEqual(int(status), 206)
1385            self.failUnless(headers.hasHeader("content-range"))
1386            self.failUnlessReallyEqual(headers.getRawHeaders("content-range")[0],
1387                                       "bytes %d-%d/%d" % (length-5, length-1, length))
1388        d.addCallback(_got)
1389        return d
1390
1391    def test_HEAD_FILEURL_partial_range_overrun(self):
1392        headers = {"range": "bytes=100-200"}
1393        d = self.shouldFail2(error.Error, "test_HEAD_FILEURL_range_overrun",
1394                             "416 Requested Range not satisfiable",
1395                             "",
1396                             self.HEAD, self.public_url + "/foo/bar.txt",
1397                             headers=headers)
1398        return d
1399
1400    def test_GET_FILEURL_range_bad(self):
1401        headers = {"range": "BOGUS=fizbop-quarnak"}
1402        d = self.GET(self.public_url + "/foo/bar.txt", headers=headers,
1403                     return_response=True)
1404        def _got(res_and_status_and_headers):
1405            (res, status, headers) = res_and_status_and_headers
1406            self.failUnlessReallyEqual(int(status), 200)
1407            self.failUnless(not headers.hasHeader("content-range"))
1408            self.failUnlessReallyEqual(res, self.BAR_CONTENTS)
1409        d.addCallback(_got)
1410        return d
1411
1412    def test_HEAD_FILEURL(self):
1413        d = self.HEAD(self.public_url + "/foo/bar.txt", return_response=True)
1414        def _got(res_and_status_and_headers):
1415            (res, status, headers) = res_and_status_and_headers
1416            self.failUnlessReallyEqual(res, "")
1417            self.failUnlessReallyEqual(int(headers.getRawHeaders("content-length")[0]),
1418                                       len(self.BAR_CONTENTS))
1419            self.failUnlessReallyEqual(headers.getRawHeaders("content-type"),
1420                                       ["text/plain"])
1421        d.addCallback(_got)
1422        return d
1423
1424    def test_GET_FILEURL_named(self):
1425        base = "/file/%s" % urlquote(self._bar_txt_uri)
1426        base2 = "/named/%s" % urlquote(self._bar_txt_uri)
1427        d = self.GET(base + "/@@name=/blah.txt")
1428        d.addCallback(self.failUnlessIsBarDotTxt)
1429        d.addCallback(lambda res: self.GET(base + "/blah.txt"))
1430        d.addCallback(self.failUnlessIsBarDotTxt)
1431        d.addCallback(lambda res: self.GET(base + "/ignore/lots/blah.txt"))
1432        d.addCallback(self.failUnlessIsBarDotTxt)
1433        d.addCallback(lambda res: self.GET(base2 + "/@@name=/blah.txt"))
1434        d.addCallback(self.failUnlessIsBarDotTxt)
1435        save_url = base + "?save=true&filename=blah.txt"
1436        d.addCallback(lambda res: self.GET(save_url))
1437        d.addCallback(self.failUnlessIsBarDotTxt) # TODO: check headers
1438        u_filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
1439        u_fn_e = urlquote(u_filename.encode("utf-8"))
1440        u_url = base + "?save=true&filename=" + u_fn_e
1441        d.addCallback(lambda res: self.GET(u_url))
1442        d.addCallback(self.failUnlessIsBarDotTxt) # TODO: check headers
1443        return d
1444
1445    def test_PUT_FILEURL_named_bad(self):
1446        base = "/file/%s" % urlquote(self._bar_txt_uri)
1447        d = self.shouldFail2(error.Error, "test_PUT_FILEURL_named_bad",
1448                             "400 Bad Request",
1449                             "/file can only be used with GET or HEAD",
1450                             self.PUT, base + "/@@name=/blah.txt", "")
1451        return d
1452
1453
1454    def test_GET_DIRURL_named_bad(self):
1455        base = "/file/%s" % urlquote(self._foo_uri)
1456        d = self.shouldFail2(error.Error, "test_PUT_DIRURL_named_bad",
1457                             "400 Bad Request",
1458                             "is not a file-cap",
1459                             self.GET, base + "/@@name=/blah.txt")
1460        return d
1461
1462    def test_GET_slash_file_bad(self):
1463        d = self.shouldFail2(error.Error, "test_GET_slash_file_bad",
1464                             "404 Not Found",
1465                             "/file must be followed by a file-cap and a name",
1466                             self.GET, "/file")
1467        return d
1468
1469    def test_GET_unhandled_URI_named(self):
1470        contents, n, newuri = self.makefile(12)
1471        verifier_cap = n.get_verify_cap().to_string()
1472        base = "/file/%s" % urlquote(verifier_cap)
1473        # client.create_node_from_uri() can't handle verify-caps
1474        d = self.shouldFail2(error.Error, "GET_unhandled_URI_named",
1475                             "400 Bad Request", "is not a file-cap",
1476                             self.GET, base)
1477        return d
1478
1479    def test_GET_unhandled_URI(self):
1480        contents, n, newuri = self.makefile(12)
1481        verifier_cap = n.get_verify_cap().to_string()
1482        base = "/uri/%s" % urlquote(verifier_cap)
1483        # client.create_node_from_uri() can't handle verify-caps
1484        d = self.shouldFail2(error.Error, "test_GET_unhandled_URI",
1485                             "400 Bad Request",
1486                             "GET unknown URI type: can only do t=info",
1487                             self.GET, base)
1488        return d
1489
1490    def test_GET_FILE_URI(self):
1491        base = "/uri/%s" % urlquote(self._bar_txt_uri)
1492        d = self.GET(base)
1493        d.addCallback(self.failUnlessIsBarDotTxt)
1494        return d
1495
1496    def test_GET_FILE_URI_mdmf(self):
1497        base = "/uri/%s" % urlquote(self._quux_txt_uri)
1498        d = self.GET(base)
1499        d.addCallback(self.failUnlessIsQuuxDotTxt)
1500        return d
1501
1502    def test_GET_FILE_URI_mdmf_extensions(self):
1503        base = "/uri/%s" % urlquote("%s:RANDOMSTUFF" % str(self._quux_txt_uri, "ascii"))
1504        d = self.GET(base)
1505        d.addCallback(self.failUnlessIsQuuxDotTxt)
1506        return d
1507
1508    def test_GET_FILE_URI_mdmf_readonly(self):
1509        base = "/uri/%s" % urlquote(str(self._quux_txt_readonly_uri, "ascii"))
1510        d = self.GET(base)
1511        d.addCallback(self.failUnlessIsQuuxDotTxt)
1512        return d
1513
1514    def test_GET_FILE_URI_badchild(self):
1515        base = "/uri/%s/boguschild" % urlquote(str(self._bar_txt_uri, "ascii"))
1516        errmsg = "Files have no children named 'boguschild'"
1517        d = self.shouldFail2(error.Error, "test_GET_FILE_URI_badchild",
1518                             "400 Bad Request", errmsg,
1519                             self.GET, base)
1520        return d
1521
1522    def test_PUT_FILE_URI_badchild(self):
1523        base = "/uri/%s/boguschild" % urlquote(str(self._bar_txt_uri, "ascii"))
1524        errmsg = "Cannot create directory 'boguschild', because its parent is a file, not a directory"
1525        d = self.shouldFail2(error.Error, "test_GET_FILE_URI_badchild",
1526                             "409 Conflict", errmsg,
1527                             self.PUT, base, "")
1528        return d
1529
1530    def test_PUT_FILE_URI_mdmf(self):
1531        base = "/uri/%s" % urlquote(str(self._quux_txt_uri, "ascii"))
1532        self._quux_new_contents = b"new_contents"
1533        d = self.GET(base)
1534        d.addCallback(lambda res:
1535            self.failUnlessIsQuuxDotTxt(res))
1536        d.addCallback(lambda ignored:
1537            self.PUT(base, self._quux_new_contents))
1538        d.addCallback(lambda ignored:
1539            self.GET(base))
1540        d.addCallback(lambda res:
1541            self.failUnlessReallyEqual(res, self._quux_new_contents))
1542        return d
1543
1544    def test_PUT_FILE_URI_mdmf_extensions(self):
1545        base = "/uri/%s" % urlquote("%s:EXTENSIONSTUFF" % str(self._quux_txt_uri, "ascii"))
1546        self._quux_new_contents = b"new_contents"
1547        d = self.GET(base)
1548        d.addCallback(lambda res: self.failUnlessIsQuuxDotTxt(res))
1549        d.addCallback(lambda ignored: self.PUT(base, self._quux_new_contents))
1550        d.addCallback(lambda ignored: self.GET(base))
1551        d.addCallback(lambda res: self.failUnlessEqual(self._quux_new_contents,
1552                                                       res))
1553        return d
1554
1555    def test_PUT_FILE_URI_mdmf_readonly(self):
1556        # We're not allowed to PUT things to a readonly cap.
1557        base = "/uri/%s" % str(self._quux_txt_readonly_uri, "ascii")
1558        d = self.GET(base)
1559        d.addCallback(lambda res:
1560            self.failUnlessIsQuuxDotTxt(res))
1561        # What should we get here? We get a 500 error now; that's not right.
1562        d.addCallback(lambda ignored:
1563            self.shouldFail2(error.Error, "test_PUT_FILE_URI_mdmf_readonly",
1564                             "400 Bad Request", "read-only cap",
1565                             self.PUT, base, b"new data"))
1566        return d
1567
1568    def test_PUT_FILE_URI_sdmf_readonly(self):
1569        # We're not allowed to put things to a readonly cap.
1570        base = "/uri/%s" % str(self._baz_txt_readonly_uri, "ascii")
1571        d = self.GET(base)
1572        d.addCallback(lambda res:
1573            self.failUnlessIsBazDotTxt(res))
1574        d.addCallback(lambda ignored:
1575            self.shouldFail2(error.Error, "test_PUT_FILE_URI_sdmf_readonly",
1576                             "400 Bad Request", "read-only cap",
1577                             self.PUT, base, b"new_data"))
1578        return d
1579
1580    def test_GET_etags(self):
1581
1582        def _check_etags(uri):
1583            d1 = _get_etag(uri)
1584            d2 = _get_etag(uri, 'json')
1585            d = defer.DeferredList([d1, d2], consumeErrors=True)
1586            def _check(results):
1587                # All deferred must succeed
1588                self.failUnless(all([r[0] for r in results]))
1589                # the etag for the t=json form should be just like the etag
1590                # fo the default t='' form, but with a 'json' suffix
1591                self.failUnlessEqual(results[0][1] + 'json', results[1][1])
1592            d.addCallback(_check)
1593            return d
1594
1595        def _get_etag(uri, t=''):
1596            targetbase = "/uri/%s?t=%s" % (urlquote(uri.strip()), t)
1597            d = self.GET(targetbase, return_response=True, followRedirect=True)
1598            def _just_the_etag(result):
1599                data, response, headers = result
1600                etag = headers.getRawHeaders('etag')[0]
1601                if uri.startswith(b'URI:DIR'):
1602                    self.failUnless(etag.startswith('DIR:'), etag)
1603                return etag
1604            return d.addCallback(_just_the_etag)
1605
1606        # Check that etags work with immutable directories
1607        (newkids, caps) = self._create_immutable_children()
1608        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir-immutable",
1609                       json.dumps(newkids))
1610        def _stash_immdir_uri(uri):
1611            self._immdir_uri = uri
1612            return uri
1613        d.addCallback(_stash_immdir_uri)
1614        d.addCallback(_check_etags)
1615
1616        # Check that etags work with immutable files
1617        d.addCallback(lambda _: _check_etags(self._bar_txt_uri))
1618
1619        # use the ETag on GET
1620        def _check_match(ign):
1621            uri = "/uri/%s" % str(self._bar_txt_uri, "ascii")
1622            d = self.GET(uri, return_response=True)
1623            # extract the ETag
1624            d.addCallback(lambda data_code_headers:
1625                          data_code_headers[2].getRawHeaders('etag')[0])
1626            # do a GET that's supposed to match the ETag
1627            d.addCallback(lambda etag:
1628                          self.GET(uri, return_response=True,
1629                                   headers={"If-None-Match": etag}))
1630            # make sure it short-circuited (304 instead of 200)
1631            d.addCallback(lambda data_code_headers:
1632                          self.failUnlessEqual(int(data_code_headers[1]), http.NOT_MODIFIED))
1633            return d
1634        d.addCallback(_check_match)
1635
1636        def _no_etag(uri, t):
1637            target = "/uri/%s?t=%s" % (str(uri, "ascii"), t)
1638            d = self.GET(target, return_response=True, followRedirect=True)
1639            d.addCallback(lambda data_code_headers:
1640                          self.failIf(data_code_headers[2].hasHeader("etag"), target))
1641            return d
1642        def _yes_etag(uri, t):
1643            target = "/uri/%s?t=%s" % (str(uri, "ascii"), t)
1644            d = self.GET(target, return_response=True, followRedirect=True)
1645            d.addCallback(lambda data_code_headers:
1646                          self.failUnless(data_code_headers[2].hasHeader("etag"), target))
1647            return d
1648
1649        d.addCallback(lambda ign: _yes_etag(self._bar_txt_uri, ""))
1650        d.addCallback(lambda ign: _yes_etag(self._bar_txt_uri, "json"))
1651        d.addCallback(lambda ign: _yes_etag(self._bar_txt_uri, "uri"))
1652        d.addCallback(lambda ign: _yes_etag(self._bar_txt_uri, "readonly-uri"))
1653        d.addCallback(lambda ign: _no_etag(self._bar_txt_uri, "info"))
1654
1655        d.addCallback(lambda ign: _yes_etag(self._immdir_uri, ""))
1656        d.addCallback(lambda ign: _yes_etag(self._immdir_uri, "json"))
1657        d.addCallback(lambda ign: _yes_etag(self._immdir_uri, "uri"))
1658        d.addCallback(lambda ign: _yes_etag(self._immdir_uri, "readonly-uri"))
1659        d.addCallback(lambda ign: _no_etag(self._immdir_uri, "info"))
1660        d.addCallback(lambda ign: _no_etag(self._immdir_uri, "rename-form"))
1661
1662        return d
1663
1664    # TODO: version of this with a Unicode filename
1665    def test_GET_FILEURL_save(self):
1666        d = self.GET(self.public_url + "/foo/bar.txt?filename=bar.txt&save=true",
1667                     return_response=True)
1668        def _got(res_and_status_and_headers):
1669            (res, statuscode, headers) = res_and_status_and_headers
1670            content_disposition = headers.getRawHeaders("content-disposition")[0]
1671            self.failUnless(content_disposition == 'attachment; filename="bar.txt"', content_disposition)
1672            self.failUnlessIsBarDotTxt(res)
1673        d.addCallback(_got)
1674        return d
1675
1676    def test_GET_FILEURL_missing(self):
1677        d = self.GET(self.public_url + "/foo/missing")
1678        d.addBoth(self.should404, "test_GET_FILEURL_missing")
1679        return d
1680
1681    def test_GET_FILEURL_info_mdmf(self):
1682        d = self.GET("/uri/%s?t=info" % str(self._quux_txt_uri, "ascii"))
1683        def _got(res):
1684            self.failUnlessIn(b"mutable file (mdmf)", res)
1685            self.failUnlessIn(self._quux_txt_uri, res)
1686            self.failUnlessIn(self._quux_txt_readonly_uri, res)
1687        d.addCallback(_got)
1688        return d
1689
1690    def test_GET_FILEURL_info_mdmf_readonly(self):
1691        d = self.GET("/uri/%s?t=info" % str(self._quux_txt_readonly_uri, "ascii"))
1692        def _got(res):
1693            self.failUnlessIn(b"mutable file (mdmf)", res)
1694            self.failIfIn(self._quux_txt_uri, res)
1695            self.failUnlessIn(self._quux_txt_readonly_uri, res)
1696        d.addCallback(_got)
1697        return d
1698
1699    def test_GET_FILEURL_info_sdmf(self):
1700        d = self.GET("/uri/%s?t=info" % str(self._baz_txt_uri, "ascii"))
1701        def _got(res):
1702            self.failUnlessIn(b"mutable file (sdmf)", res)
1703            self.failUnlessIn(self._baz_txt_uri, res)
1704        d.addCallback(_got)
1705        return d
1706
1707    def test_GET_FILEURL_info_mdmf_extensions(self):
1708        d = self.GET("/uri/%s:STUFF?t=info" % str(self._quux_txt_uri, "ascii"))
1709        def _got(res):
1710            self.failUnlessIn(b"mutable file (mdmf)", res)
1711            self.failUnlessIn(self._quux_txt_uri, res)
1712            self.failUnlessIn(self._quux_txt_readonly_uri, res)
1713        d.addCallback(_got)
1714        return d
1715
1716    def test_PUT_overwrite_only_files(self):
1717        # create a directory, put a file in that directory.
1718        contents, n, filecap = self.makefile(8)
1719        d = self.PUT(self.public_url + "/foo/dir?t=mkdir", "")
1720        d.addCallback(lambda res:
1721            self.PUT(self.public_url + "/foo/dir/file1.txt",
1722                     self.NEWFILE_CONTENTS))
1723        # try to overwrite the file with replace=only-files
1724        # (this should work)
1725        d.addCallback(lambda res:
1726            self.PUT(self.public_url + "/foo/dir/file1.txt?t=uri&replace=only-files",
1727                     filecap))
1728        d.addCallback(lambda res:
1729            self.shouldFail2(error.Error, "PUT_bad_t", "409 Conflict",
1730                 "There was already a child by that name, and you asked me "
1731                 "to not replace it",
1732                 self.PUT, self.public_url + "/foo/dir?t=uri&replace=only-files",
1733                 filecap))
1734        return d
1735
1736    def test_PUT_NEWFILEURL(self):
1737        d = self.PUT(self.public_url + "/foo/new.txt", self.NEWFILE_CONTENTS)
1738        # TODO: we lose the response code, so we can't check this
1739        #self.failUnlessReallyEqual(responsecode, 201)
1740        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"new.txt")
1741        d.addCallback(lambda res:
1742                      self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
1743                                                      self.NEWFILE_CONTENTS))
1744        return d
1745
1746    def test_PUT_NEWFILEURL_not_mutable(self):
1747        d = self.PUT(self.public_url + "/foo/new.txt?mutable=false",
1748                     self.NEWFILE_CONTENTS)
1749        # TODO: we lose the response code, so we can't check this
1750        #self.failUnlessReallyEqual(responsecode, 201)
1751        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"new.txt")
1752        d.addCallback(lambda res:
1753                      self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
1754                                                      self.NEWFILE_CONTENTS))
1755        return d
1756
1757    def test_PUT_NEWFILEURL_unlinked_mdmf(self):
1758        # this should get us a few segments of an MDMF mutable file,
1759        # which we can then test for.
1760        contents = self.NEWFILE_CONTENTS * 300000
1761        d = self.PUT("/uri?format=mdmf",
1762                     contents)
1763        def _got_filecap(filecap):
1764            self.failUnless(filecap.startswith(b"URI:MDMF"))
1765            return filecap
1766        d.addCallback(_got_filecap)
1767        d.addCallback(lambda filecap: self.GET("/uri/%s?t=json" % str(filecap, "utf-8")))
1768        d.addCallback(lambda json: self.failUnlessIn(b"MDMF", json))
1769        return d
1770
1771    def test_PUT_NEWFILEURL_unlinked_sdmf(self):
1772        contents = self.NEWFILE_CONTENTS * 300000
1773        d = self.PUT("/uri?format=sdmf",
1774                     contents)
1775        d.addCallback(lambda filecap: self.GET("/uri/%s?t=json" % str(filecap, "utf-8")))
1776        d.addCallback(lambda json: self.failUnlessIn(b"SDMF", json))
1777        return d
1778
1779    @inlineCallbacks
1780    def test_PUT_NEWFILEURL_unlinked_bad_format(self):
1781        contents = self.NEWFILE_CONTENTS * 300000
1782        yield self.assertHTTPError(self.webish_url + "/uri?format=foo", 400,
1783                                   "Unknown format: foo",
1784                                   method="put", data=contents)
1785
1786    def test_PUT_NEWFILEURL_range_bad(self):
1787        headers = {"content-range": "bytes 1-10/%d" % len(self.NEWFILE_CONTENTS)}
1788        target = self.public_url + "/foo/new.txt"
1789        d = self.shouldFail2(error.Error, "test_PUT_NEWFILEURL_range_bad",
1790                             "501 Not Implemented",
1791                             "Content-Range in PUT not yet supported",
1792                             # (and certainly not for immutable files)
1793                             self.PUT, target, self.NEWFILE_CONTENTS[1:11],
1794                             headers=headers)
1795        d.addCallback(lambda res:
1796                      self.failIfNodeHasChild(self._foo_node, u"new.txt"))
1797        return d
1798
1799    def test_PUT_NEWFILEURL_mutable(self):
1800        d = self.PUT(self.public_url + "/foo/new.txt?mutable=true",
1801                     self.NEWFILE_CONTENTS)
1802        # TODO: we lose the response code, so we can't check this
1803        #self.failUnlessReallyEqual(responsecode, 201)
1804        def _check_uri(res):
1805            u = uri.from_string_mutable_filenode(res)
1806            self.failUnless(u.is_mutable())
1807            self.failIf(u.is_readonly())
1808            return res
1809        d.addCallback(_check_uri)
1810        d.addCallback(self.failUnlessURIMatchesRWChild, self._foo_node, u"new.txt")
1811        d.addCallback(lambda res:
1812                      self.failUnlessMutableChildContentsAre(self._foo_node,
1813                                                             u"new.txt",
1814                                                             self.NEWFILE_CONTENTS))
1815        return d
1816
1817    def test_PUT_NEWFILEURL_mutable_toobig(self):
1818        # It is okay to upload large mutable files, so we should be able
1819        # to do that.
1820        d = self.PUT(self.public_url + "/foo/new.txt?mutable=true",
1821                     b"b" * (self.s.MUTABLE_SIZELIMIT + 1))
1822        return d
1823
1824    def test_PUT_NEWFILEURL_replace(self):
1825        d = self.PUT(self.public_url + "/foo/bar.txt", self.NEWFILE_CONTENTS)
1826        # TODO: we lose the response code, so we can't check this
1827        #self.failUnlessReallyEqual(responsecode, 200)
1828        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"bar.txt")
1829        d.addCallback(lambda res:
1830                      self.failUnlessChildContentsAre(self._foo_node, u"bar.txt",
1831                                                      self.NEWFILE_CONTENTS))
1832        return d
1833
1834    def test_PUT_NEWFILEURL_bad_t(self):
1835        d = self.shouldFail2(error.Error, "PUT_bad_t", "400 Bad Request",
1836                             "PUT to a file: bad t=bogus",
1837                             self.PUT, self.public_url + "/foo/bar.txt?t=bogus",
1838                             b"contents")
1839        return d
1840
1841    def test_PUT_NEWFILEURL_no_replace(self):
1842        d = self.PUT(self.public_url + "/foo/bar.txt?replace=false",
1843                     self.NEWFILE_CONTENTS)
1844        d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_no_replace",
1845                  "409 Conflict",
1846                  "There was already a child by that name, and you asked me "
1847                  "to not replace it")
1848        return d
1849
1850    def test_PUT_NEWFILEURL_mkdirs(self):
1851        d = self.PUT(self.public_url + "/foo/newdir/new.txt", self.NEWFILE_CONTENTS)
1852        fn = self._foo_node
1853        d.addCallback(self.failUnlessURIMatchesROChild, fn, u"newdir/new.txt")
1854        d.addCallback(lambda res: self.failIfNodeHasChild(fn, u"new.txt"))
1855        d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, u"newdir"))
1856        d.addCallback(lambda res:
1857                      self.failUnlessChildContentsAre(fn, u"newdir/new.txt",
1858                                                      self.NEWFILE_CONTENTS))
1859        return d
1860
1861    def test_PUT_NEWFILEURL_blocked(self):
1862        d = self.PUT(self.public_url + "/foo/blockingfile/new.txt",
1863                     self.NEWFILE_CONTENTS)
1864        d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_blocked",
1865                  "409 Conflict",
1866                  "Unable to create directory 'blockingfile': a file was in the way")
1867        return d
1868
1869    def test_PUT_NEWFILEURL_emptyname(self):
1870        # an empty pathname component (i.e. a double-slash) is disallowed
1871        d = self.shouldFail2(error.Error, "test_PUT_NEWFILEURL_emptyname",
1872                             "400 Bad Request",
1873                             "The webapi does not allow empty pathname components",
1874                             self.PUT, self.public_url + "/foo//new.txt", "")
1875        return d
1876
1877    def test_DELETE_FILEURL(self):
1878        d = self.DELETE(self.public_url + "/foo/bar.txt")
1879        d.addCallback(lambda res:
1880                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
1881        return d
1882
1883    def test_DELETE_FILEURL_missing(self):
1884        d = self.DELETE(self.public_url + "/foo/missing")
1885        d.addBoth(self.should404, "test_DELETE_FILEURL_missing")
1886        return d
1887
1888    def test_DELETE_FILEURL_missing2(self):
1889        d = self.DELETE(self.public_url + "/missing/missing")
1890        d.addBoth(self.should404, "test_DELETE_FILEURL_missing2")
1891        return d
1892
1893    def failUnlessHasBarDotTxtMetadata(self, res):
1894        data = json.loads(res)
1895        self.failUnless(isinstance(data, list))
1896        self.failUnlessIn("metadata", data[1])
1897        self.failUnlessIn("tahoe", data[1]["metadata"])
1898        self.failUnlessIn("linkcrtime", data[1]["metadata"]["tahoe"])
1899        self.failUnlessIn("linkmotime", data[1]["metadata"]["tahoe"])
1900        self.failUnlessReallyEqual(data[1]["metadata"]["tahoe"]["linkcrtime"],
1901                                   self._bar_txt_metadata["tahoe"]["linkcrtime"])
1902
1903    def test_GET_FILEURL_json(self):
1904        # twisted.web.http.parse_qs ignores any query args without an '=', so
1905        # I can't do "GET /path?json", I have to do "GET /path/t=json"
1906        # instead. This may make it tricky to emulate the S3 interface
1907        # completely.
1908        d = self.GET(self.public_url + "/foo/bar.txt?t=json")
1909        def _check1(data):
1910            self.failUnlessIsBarJSON(data)
1911            self.failUnlessHasBarDotTxtMetadata(data)
1912            return
1913        d.addCallback(_check1)
1914        return d
1915
1916    def test_GET_FILEURL_json_mutable_type(self):
1917        # The JSON should include format, which says whether the
1918        # file is SDMF or MDMF
1919        d = self.PUT("/uri?format=mdmf",
1920                     self.NEWFILE_CONTENTS * 300000)
1921        d.addCallback(lambda filecap: self.GET("/uri/%s?t=json" % str(filecap, "ascii")))
1922        def _got_json(raw, version):
1923            data = json.loads(raw)
1924            assert "filenode" == data[0]
1925            data = data[1]
1926            assert isinstance(data, dict)
1927
1928            self.failUnlessIn("format", data)
1929            self.failUnlessEqual(data["format"], version)
1930
1931        d.addCallback(_got_json, "MDMF")
1932        # Now make an SDMF file and check that it is reported correctly.
1933        d.addCallback(lambda ignored:
1934            self.PUT("/uri?format=sdmf",
1935                      self.NEWFILE_CONTENTS * 300000))
1936        d.addCallback(lambda filecap: self.GET("/uri/%s?t=json" % str(filecap, "ascii")))
1937        d.addCallback(_got_json, "SDMF")
1938        return d
1939
1940    def test_GET_FILEURL_json_mdmf(self):
1941        d = self.GET("/uri/%s?t=json" % urlquote(str(self._quux_txt_uri, "ascii")))
1942        d.addCallback(self.failUnlessIsQuuxJSON)
1943        return d
1944
1945    def test_GET_FILEURL_json_missing(self):
1946        d = self.GET(self.public_url + "/foo/missing?json")
1947        d.addBoth(self.should404, "test_GET_FILEURL_json_missing")
1948        return d
1949
1950    def test_GET_FILEURL_uri(self):
1951        d = self.GET(self.public_url + "/foo/bar.txt?t=uri")
1952        def _check(res):
1953            self.failUnlessReallyEqual(res, self._bar_txt_uri)
1954        d.addCallback(_check)
1955        d.addCallback(lambda res:
1956                      self.GET(self.public_url + "/foo/bar.txt?t=readonly-uri"))
1957        def _check2(res):
1958            # for now, for files, uris and readonly-uris are the same
1959            self.failUnlessReallyEqual(res, self._bar_txt_uri)
1960        d.addCallback(_check2)
1961        return d
1962
1963    @inlineCallbacks
1964    def test_GET_FILEURL_badtype(self):
1965        url = self.webish_url + self.public_url + "/foo/bar.txt?t=bogus"
1966        yield self.assertHTTPError(url, 400, "bad t=bogus")
1967
1968    def test_CSS_FILE(self):
1969        d = self.GET("/tahoe.css", followRedirect=True)
1970        def _check(res):
1971            CSS_STYLE=re.compile(b'toolbar\s{.+text-align:\scenter.+toolbar-item.+display:\sinline',re.DOTALL)
1972            self.failUnless(CSS_STYLE.search(res), res)
1973        d.addCallback(_check)
1974        return d
1975
1976    def test_GET_FILEURL_uri_missing(self):
1977        d = self.GET(self.public_url + "/foo/missing?t=uri")
1978        d.addBoth(self.should404, "test_GET_FILEURL_uri_missing")
1979        return d
1980
1981    def _check_upload_and_mkdir_forms(self, soup):
1982        """
1983        Confirm `soup` contains a form to create a file, with radio
1984        buttons that allow the user to toggle whether it is a CHK/LIT
1985        (default), SDMF, or MDMF file.
1986        """
1987        found = []
1988        desired_ids = (
1989            u"upload-chk",
1990            u"upload-sdmf",
1991            u"upload-mdmf",
1992            u"mkdir-sdmf",
1993            u"mkdir-mdmf",
1994        )
1995        for input_tag in soup.find_all(u"input"):
1996            if input_tag.get(u"id", u"") in desired_ids:
1997                found.append(input_tag)
1998            else:
1999                if input_tag.get(u"name", u"") == u"t" and input_tag.get(u"type", u"") == u"hidden":
2000                    if input_tag[u"value"] == u"upload":
2001                        found.append(input_tag)
2002                    elif input_tag[u"value"] == u"mkdir":
2003                        found.append(input_tag)
2004        self.assertEqual(len(found), 7, u"Failed to find all 7 <input> tags")
2005        assert_soup_has_favicon(self, soup)
2006
2007    @inlineCallbacks
2008    def test_GET_DIRECTORY_html(self):
2009        data = yield self.GET(self.public_url + "/foo", followRedirect=True)
2010        soup = BeautifulSoup(data, 'html5lib')
2011        self._check_upload_and_mkdir_forms(soup)
2012        toolbars = soup.find_all(u"li", {u"class": u"toolbar-item"})
2013        self.assertTrue(any(li.text == u"Return to Welcome page" for li in toolbars))
2014        self.failUnlessIn(b"quux", data)
2015
2016    @inlineCallbacks
2017    def test_GET_DIRECTORY_html_filenode_encoding(self):
2018        data = yield self.GET(self.public_url + "/foo", followRedirect=True)
2019        soup = BeautifulSoup(data, 'html5lib')
2020        # Check if encoded entries are there
2021        target_ref = u'@@named=/{}'.format(self._htmlname_urlencoded)
2022        # at least one <a> tag has our weirdly-named file properly
2023        # encoded (or else BeautifulSoup would produce an error)
2024        self.assertTrue(
2025            any(
2026                a.text == self._htmlname_unicode and a[u"href"].endswith(target_ref)
2027                for a in soup.find_all(u"a", {u"rel": u"noreferrer"})
2028            )
2029        )
2030
2031    @inlineCallbacks
2032    def test_GET_root_html(self):
2033        data = yield self.GET("/")
2034        soup = BeautifulSoup(data, 'html5lib')
2035        self._check_upload_and_mkdir_forms(soup)
2036
2037    @inlineCallbacks
2038    def test_GET_DIRURL(self):
2039        data = yield self.GET(self.public_url + "/foo", followRedirect=True)
2040        soup = BeautifulSoup(data, 'html5lib')
2041
2042        # from /uri/$URI/foo/ , we need ../../../ to get back to the root
2043        root = u"../../.."
2044        self.assertTrue(
2045            any(
2046                a.text == u"Return to Welcome page"
2047                for a in soup.find_all(u"a", {u"href": root})
2048            )
2049        )
2050
2051        # the FILE reference points to a URI, but it should end in bar.txt
2052        bar_url = "{}/file/{}/@@named=/bar.txt".format(root, urlquote(self._bar_txt_uri))
2053        self.assertTrue(
2054            any(
2055                a.text == u"bar.txt"
2056                for a in soup.find_all(u"a", {u"href": bar_url})
2057            )
2058        )
2059        self.assertTrue(
2060            any(
2061                td.text == u"{}".format(len(self.BAR_CONTENTS))
2062                for td in soup.find_all(u"td", {u"align": u"right"})
2063            )
2064        )
2065        foo_url = urlquote("{}/uri/{}/".format(root, str(self._foo_uri, "ascii")))
2066        forms = soup.find_all(u"form", {u"action": foo_url})
2067        found = []
2068        for form in forms:
2069            if form.find_all(u"input", {u"name": u"name", u"value": u"bar.txt"}):
2070                kind = form.find_all(u"input", {u"type": u"submit"})[0][u"value"]
2071                found.append(kind)
2072                if kind == u"unlink":
2073                    self.assertTrue(form[u"method"] == u"post")
2074        self.assertEqual(
2075            set(found),
2076            {u"unlink", u"rename/relink"}
2077        )
2078
2079        sub_url = "{}/uri/{}/".format(root, urlquote(self._sub_uri))
2080        self.assertTrue(
2081            any(
2082                td.findNextSibling()(u"a")[0][u"href"] == sub_url
2083                for td in soup.find_all(u"td")
2084                if td.text == u"DIR"
2085            )
2086        )
2087
2088    @inlineCallbacks
2089    def test_GET_DIRURL_readonly(self):
2090        # look at a readonly directory
2091        data = yield self.GET(self.public_url + "/reedownlee", followRedirect=True)
2092        self.failUnlessIn(b"(read-only)", data)
2093        self.failIfIn(b"Upload a file", data)
2094
2095    @inlineCallbacks
2096    def test_GET_DIRURL_readonly_dir(self):
2097        # look at a directory that contains a readonly directory
2098        data = yield self.GET(self.public_url, followRedirect=True)
2099        soup = BeautifulSoup(data, 'html5lib')
2100        ro_links = list(
2101            td.findNextSibling()(u"a")[0]
2102            for td in soup.find_all(u"td")
2103            if td.text == u"DIR-RO"
2104        )
2105        self.assertEqual(1, len(ro_links))
2106        self.assertEqual(u"reedownlee", ro_links[0].text)
2107        self.assertTrue(u"URI%3ADIR2-RO%3A" in ro_links[0][u"href"])
2108
2109    @inlineCallbacks
2110    def test_GET_DIRURL_empty(self):
2111        # look at an empty directory
2112        data = yield self.GET(self.public_url + "/foo/empty")
2113        soup = BeautifulSoup(data, 'html5lib')
2114        self.failUnlessIn(b"directory is empty", data)
2115        mkdir_inputs = soup.find_all(u"input", {u"type": u"hidden", u"name": u"t", u"value": u"mkdir"})
2116        self.assertEqual(1, len(mkdir_inputs))
2117        self.assertEqual(
2118            u"Create a new directory in this directory",
2119            mkdir_inputs[0].parent(u"legend")[0].text
2120        )
2121
2122    @inlineCallbacks
2123    def test_GET_DIRURL_literal(self):
2124        # look at a literal directory
2125        tiny_litdir_uri = "URI:DIR2-LIT:gqytunj2onug64tufqzdcosvkjetutcjkq5gw4tvm5vwszdgnz5hgyzufqydulbshj5x2lbm" # contains one child which is itself also LIT
2126        data = yield self.GET("/uri/" + tiny_litdir_uri, followRedirect=True)
2127        soup = BeautifulSoup(data, 'html5lib')
2128        self.failUnlessIn(b'(immutable)', data)
2129        file_links = list(
2130            td.findNextSibling()(u"a")[0]
2131            for td in soup.find_all(u"td")
2132            if td.text == u"FILE"
2133        )
2134        self.assertEqual(1, len(file_links))
2135        self.assertEqual(u"short", file_links[0].text)
2136        self.assertTrue(file_links[0][u"href"].endswith(u"/file/URI%3ALIT%3Akrugkidfnzsc4/@@named=/short"))
2137
2138    @inlineCallbacks
2139    def test_GET_DIRURL_badtype(self):
2140        url = self.webish_url + self.public_url + "/foo?t=bogus"
2141        yield self.assertHTTPError(url, 400, b"bad t=bogus")
2142
2143    def test_GET_DIRURL_json(self):
2144        d = self.GET(self.public_url + "/foo?t=json")
2145        d.addCallback(self.failUnlessIsFooJSON)
2146        return d
2147
2148    def test_GET_DIRURL_json_format(self):
2149        d = self.PUT(self.public_url + \
2150                     "/foo/sdmf.txt?format=sdmf",
2151                     self.NEWFILE_CONTENTS * 300000)
2152        d.addCallback(lambda ignored:
2153            self.PUT(self.public_url + \
2154                     "/foo/mdmf.txt?format=mdmf",
2155                     self.NEWFILE_CONTENTS * 300000))
2156        # Now we have an MDMF and SDMF file in the directory. If we GET
2157        # its JSON, we should see their encodings.
2158        d.addCallback(lambda ignored:
2159            self.GET(self.public_url + "/foo?t=json"))
2160        def _got_json(raw):
2161            data = json.loads(raw)
2162            assert data[0] == "dirnode"
2163
2164            data = data[1]
2165            kids = data['children']
2166
2167            mdmf_data = kids['mdmf.txt'][1]
2168            self.failUnlessIn("format", mdmf_data)
2169            self.failUnlessEqual(mdmf_data["format"], "MDMF")
2170
2171            sdmf_data = kids['sdmf.txt'][1]
2172            self.failUnlessIn("format", sdmf_data)
2173            self.failUnlessEqual(sdmf_data["format"], "SDMF")
2174        d.addCallback(_got_json)
2175        return d
2176
2177
2178    def test_POST_DIRURL_manifest_no_ophandle(self):
2179        d = self.shouldFail2(error.Error,
2180                             "test_POST_DIRURL_manifest_no_ophandle",
2181                             "400 Bad Request",
2182                             "slow operation requires ophandle=",
2183                             self.POST, self.public_url, t="start-manifest")
2184        return d
2185
2186    def test_POST_DIRURL_manifest(self):
2187        d = defer.succeed(None)
2188        def getman(ignored, output):
2189            url = self.webish_url + self.public_url + "/foo?t=start-manifest&ophandle=125"
2190            d = do_http("post", url, allow_redirects=True,
2191                        browser_like_redirects=True)
2192            d.addCallback(self.wait_for_operation, "125")
2193            d.addCallback(self.get_operation_results, "125", output)
2194            return d
2195        d.addCallback(getman, None)
2196        def _got_html(manifest):
2197            soup = BeautifulSoup(manifest, 'html5lib')
2198            assert_soup_has_text(self, soup, "Manifest of SI=")
2199            assert_soup_has_text(self, soup, "sub")
2200            assert_soup_has_text(self, soup, str(self._sub_uri, "ascii"))
2201            assert_soup_has_text(self, soup, "sub/baz.txt")
2202            assert_soup_has_favicon(self, soup)
2203        d.addCallback(_got_html)
2204
2205        # both t=status and unadorned GET should be identical
2206        d.addCallback(lambda res: self.GET("/operations/125"))
2207        d.addCallback(_got_html)
2208
2209        d.addCallback(getman, "html")
2210        d.addCallback(_got_html)
2211        d.addCallback(getman, "text")
2212        def _got_text(manifest):
2213            self.failUnlessIn(b"\nsub " + self._sub_uri + b"\n", manifest)
2214            self.failUnlessIn(b"\nsub/baz.txt URI:CHK:", manifest)
2215        d.addCallback(_got_text)
2216        d.addCallback(getman, "JSON")
2217        def _got_json(res):
2218            data = res["manifest"]
2219            got = {}
2220            for (path_list, cap) in data:
2221                got[tuple(path_list)] = cap
2222            self.failUnlessReallyEqual(to_bytes(got[(u"sub",)]), self._sub_uri)
2223            self.failUnlessIn((u"sub", u"baz.txt"), got)
2224            self.failUnlessIn("finished", res)
2225            self.failUnlessIn("origin", res)
2226            self.failUnlessIn("storage-index", res)
2227            self.failUnlessIn("verifycaps", res)
2228            self.failUnlessIn("stats", res)
2229        d.addCallback(_got_json)
2230        return d
2231
2232    def test_POST_DIRURL_deepsize_no_ophandle(self):
2233        d = self.shouldFail2(error.Error,
2234                             "test_POST_DIRURL_deepsize_no_ophandle",
2235                             "400 Bad Request",
2236                             "slow operation requires ophandle=",
2237                             self.POST, self.public_url, t="start-deep-size")
2238        return d
2239
2240    def test_POST_DIRURL_deepsize(self):
2241        url = self.webish_url + self.public_url + "/foo?t=start-deep-size&ophandle=126"
2242        d = do_http("post", url, allow_redirects=True,
2243                    browser_like_redirects=True)
2244        d.addCallback(self.wait_for_operation, "126")
2245        d.addCallback(self.get_operation_results, "126", "json")
2246        def _got_json(data):
2247            self.failUnlessReallyEqual(data["finished"], True)
2248            size = data["size"]
2249            # Match calculation of text value size below:
2250            self.failUnless(
2251                size.get("size-directories", 0) + size.get("size-mutable-files", 0) +
2252                size.get("size-immutable-files", 0) > 1000)
2253        d.addCallback(_got_json)
2254        d.addCallback(self.get_operation_results, "126", "text")
2255        def _got_text(res):
2256            mo = re.search(br'^size: (\d+)$', res, re.M)
2257            self.failUnless(mo, res)
2258            size = int(mo.group(1))
2259            # with directories, the size varies.
2260            self.failUnless(size > 1000)
2261        d.addCallback(_got_text)
2262        return d
2263
2264    def test_POST_DIRURL_deepstats_no_ophandle(self):
2265        d = self.shouldFail2(error.Error,
2266                             "test_POST_DIRURL_deepstats_no_ophandle",
2267                             "400 Bad Request",
2268                             "slow operation requires ophandle=",
2269                             self.POST, self.public_url, t="start-deep-stats")
2270        return d
2271
2272    def test_POST_DIRURL_deepstats(self):
2273        url = self.webish_url + self.public_url + "/foo?t=start-deep-stats&ophandle=127"
2274        d = do_http("post", url,
2275                    allow_redirects=True, browser_like_redirects=True)
2276        d.addCallback(self.wait_for_operation, "127")
2277        d.addCallback(self.get_operation_results, "127", "json")
2278        def _got_json(stats):
2279            expected = {"count-immutable-files": 4,
2280                        "count-mutable-files": 2,
2281                        "count-literal-files": 0,
2282                        "count-files": 6,
2283                        "count-directories": 3,
2284                        "size-immutable-files": 76,
2285                        "size-literal-files": 0,
2286                        #"size-directories": 1912, # varies
2287                        #"largest-directory": 1590,
2288                        "largest-directory-children": 8,
2289                        "largest-immutable-file": 19,
2290                        "api-version": 1,
2291                        }
2292            for k,v in list(expected.items()):
2293                self.failUnlessReallyEqual(stats[k], v,
2294                                           "stats[%s] was %s, not %s" %
2295                                           (k, stats[k], v))
2296            self.failUnlessReallyEqual(stats["size-files-histogram"],
2297                                       [ [11, 31, 4] ])
2298        d.addCallback(_got_json)
2299        return d
2300
2301    def test_POST_DIRURL_stream_manifest(self):
2302        d = self.POST(self.public_url + "/foo?t=stream-manifest")
2303        def _check(res):
2304            self.failUnless(res.endswith(b"\n"))
2305            units = [json.loads(t) for t in res[:-1].split(b"\n")]
2306            self.failUnlessReallyEqual(len(units), 10)
2307            self.failUnlessEqual(units[-1]["type"], "stats")
2308            first = units[0]
2309            self.failUnlessEqual(first["path"], [])
2310            self.failUnlessReallyEqual(to_bytes(first["cap"]), self._foo_uri)
2311            self.failUnlessEqual(first["type"], "directory")
2312            baz = [u for u in units[:-1] if to_bytes(u["cap"]) == self._baz_file_uri][0]
2313            self.failUnlessEqual(baz["path"], ["sub", "baz.txt"])
2314            self.failIfEqual(baz["storage-index"], None)
2315            self.failIfEqual(baz["verifycap"], None)
2316            self.failIfEqual(baz["repaircap"], None)
2317            # XXX: Add quux and baz to this test.
2318            return
2319        d.addCallback(_check)
2320        return d
2321
2322    def test_GET_DIRURL_uri(self):
2323        d = self.GET(self.public_url + "/foo?t=uri")
2324        def _check(res):
2325            self.failUnlessReallyEqual(to_bytes(res), self._foo_uri)
2326        d.addCallback(_check)
2327        return d
2328
2329    def test_GET_DIRURL_readonly_uri(self):
2330        d = self.GET(self.public_url + "/foo?t=readonly-uri")
2331        def _check(res):
2332            self.failUnlessReallyEqual(to_bytes(res), self._foo_readonly_uri)
2333        d.addCallback(_check)
2334        return d
2335
2336    def test_PUT_NEWDIRURL(self):
2337        d = self.PUT(self.public_url + "/foo/newdir?t=mkdir", "")
2338        d.addCallback(lambda res:
2339                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2340        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2341        d.addCallback(self.failUnlessNodeKeysAre, [])
2342        return d
2343
2344    def test_PUT_NEWDIRURL_mdmf(self):
2345        d = self.PUT(self.public_url + "/foo/newdir?t=mkdir&format=mdmf", "")
2346        d.addCallback(lambda res:
2347                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2348        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2349        d.addCallback(lambda node:
2350            self.failUnlessEqual(node._node.get_version(), MDMF_VERSION))
2351        return d
2352
2353    def test_PUT_NEWDIRURL_sdmf(self):
2354        d = self.PUT(self.public_url + "/foo/newdir?t=mkdir&format=sdmf",
2355                     "")
2356        d.addCallback(lambda res:
2357                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2358        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2359        d.addCallback(lambda node:
2360            self.failUnlessEqual(node._node.get_version(), SDMF_VERSION))
2361        return d
2362
2363    @inlineCallbacks
2364    def test_PUT_NEWDIRURL_bad_format(self):
2365        url = (self.webish_url + self.public_url +
2366               "/foo/newdir=?t=mkdir&format=foo")
2367        yield self.assertHTTPError(url, 400, "Unknown format: foo",
2368                                   method="put", data="")
2369
2370    def test_POST_NEWDIRURL(self):
2371        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir", "")
2372        d.addCallback(lambda res:
2373                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2374        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2375        d.addCallback(self.failUnlessNodeKeysAre, [])
2376        return d
2377
2378    def test_POST_NEWDIRURL_mdmf(self):
2379        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir&format=mdmf", "")
2380        d.addCallback(lambda res:
2381                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2382        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2383        d.addCallback(lambda node:
2384            self.failUnlessEqual(node._node.get_version(), MDMF_VERSION))
2385        return d
2386
2387    def test_POST_NEWDIRURL_sdmf(self):
2388        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir&format=sdmf", "")
2389        d.addCallback(lambda res:
2390            self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2391        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2392        d.addCallback(lambda node:
2393            self.failUnlessEqual(node._node.get_version(), SDMF_VERSION))
2394        return d
2395
2396    @inlineCallbacks
2397    def test_POST_NEWDIRURL_bad_format(self):
2398        url = (self.webish_url + self.public_url +
2399               "/foo/newdir?t=mkdir&format=foo")
2400        yield self.assertHTTPError(url, 400, "Unknown format: foo",
2401                                   method="post", data="")
2402
2403    def test_POST_NEWDIRURL_emptyname(self):
2404        # an empty pathname component (i.e. a double-slash) is disallowed
2405        d = self.shouldFail2(error.Error, "POST_NEWDIRURL_emptyname",
2406                             "400 Bad Request",
2407                             "The webapi does not allow empty pathname components, i.e. a double slash",
2408                             self.POST, self.public_url + "//?t=mkdir")
2409        return d
2410
2411    def _do_POST_NEWDIRURL_initial_children_test(self, version=None):
2412        (newkids, caps) = self._create_initial_children()
2413        query = "/foo/newdir?t=mkdir-with-children"
2414        if version == MDMF_VERSION:
2415            query += "&format=mdmf"
2416        elif version == SDMF_VERSION:
2417            query += "&format=sdmf"
2418        else:
2419            version = SDMF_VERSION # for later
2420        d = self.POST2(self.public_url + query,
2421                       json.dumps(newkids))
2422        def _check(uri):
2423            n = self.s.create_node_from_uri(uri.strip())
2424            d2 = self.failUnlessNodeKeysAre(n, list(newkids.keys()))
2425            self.failUnlessEqual(n._node.get_version(), version)
2426            d2.addCallback(lambda ign:
2427                           self.failUnlessROChildURIIs(n, u"child-imm",
2428                                                       caps['filecap1']))
2429            d2.addCallback(lambda ign:
2430                           self.failUnlessRWChildURIIs(n, u"child-mutable",
2431                                                       caps['filecap2']))
2432            d2.addCallback(lambda ign:
2433                           self.failUnlessROChildURIIs(n, u"child-mutable-ro",
2434                                                       caps['filecap3']))
2435            d2.addCallback(lambda ign:
2436                           self.failUnlessROChildURIIs(n, u"unknownchild-ro",
2437                                                       caps['unknown_rocap']))
2438            d2.addCallback(lambda ign:
2439                           self.failUnlessRWChildURIIs(n, u"unknownchild-rw",
2440                                                       caps['unknown_rwcap']))
2441            d2.addCallback(lambda ign:
2442                           self.failUnlessROChildURIIs(n, u"unknownchild-imm",
2443                                                       caps['unknown_immcap']))
2444            d2.addCallback(lambda ign:
2445                           self.failUnlessRWChildURIIs(n, u"dirchild",
2446                                                       caps['dircap']))
2447            d2.addCallback(lambda ign:
2448                           self.failUnlessROChildURIIs(n, u"dirchild-lit",
2449                                                       caps['litdircap']))
2450            d2.addCallback(lambda ign:
2451                           self.failUnlessROChildURIIs(n, u"dirchild-empty",
2452                                                       caps['emptydircap']))
2453            return d2
2454        d.addCallback(_check)
2455        d.addCallback(lambda res:
2456                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2457        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2458        d.addCallback(self.failUnlessNodeKeysAre, list(newkids.keys()))
2459        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2460        d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
2461        return d
2462
2463    def test_POST_NEWDIRURL_initial_children(self):
2464        return self._do_POST_NEWDIRURL_initial_children_test()
2465
2466    def test_POST_NEWDIRURL_initial_children_mdmf(self):
2467        return self._do_POST_NEWDIRURL_initial_children_test(MDMF_VERSION)
2468
2469    def test_POST_NEWDIRURL_initial_children_sdmf(self):
2470        return self._do_POST_NEWDIRURL_initial_children_test(SDMF_VERSION)
2471
2472    @inlineCallbacks
2473    def test_POST_NEWDIRURL_initial_children_bad_format(self):
2474        (newkids, caps) = self._create_initial_children()
2475        url = (self.webish_url + self.public_url +
2476               "/foo/newdir?t=mkdir-with-children&format=foo")
2477        yield self.assertHTTPError(url, 400, "Unknown format: foo",
2478                                   method="post", data=json.dumps(newkids).encode("utf-8"))
2479
2480    def test_POST_NEWDIRURL_immutable(self):
2481        (newkids, caps) = self._create_immutable_children()
2482        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir-immutable",
2483                       json.dumps(newkids))
2484        def _check(uri):
2485            n = self.s.create_node_from_uri(uri.strip())
2486            d2 = self.failUnlessNodeKeysAre(n, list(newkids.keys()))
2487            d2.addCallback(lambda ign:
2488                           self.failUnlessROChildURIIs(n, u"child-imm",
2489                                                       caps['filecap1']))
2490            d2.addCallback(lambda ign:
2491                           self.failUnlessROChildURIIs(n, u"unknownchild-imm",
2492                                                       caps['unknown_immcap']))
2493            d2.addCallback(lambda ign:
2494                           self.failUnlessROChildURIIs(n, u"dirchild-imm",
2495                                                       caps['immdircap']))
2496            d2.addCallback(lambda ign:
2497                           self.failUnlessROChildURIIs(n, u"dirchild-lit",
2498                                                       caps['litdircap']))
2499            d2.addCallback(lambda ign:
2500                           self.failUnlessROChildURIIs(n, u"dirchild-empty",
2501                                                       caps['emptydircap']))
2502            return d2
2503        d.addCallback(_check)
2504        d.addCallback(lambda res:
2505                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
2506        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2507        d.addCallback(self.failUnlessNodeKeysAre, list(newkids.keys()))
2508        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2509        d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
2510        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2511        d.addCallback(self.failUnlessROChildURIIs, u"unknownchild-imm", caps['unknown_immcap'])
2512        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2513        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-imm", caps['immdircap'])
2514        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2515        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-lit", caps['litdircap'])
2516        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
2517        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-empty", caps['emptydircap'])
2518        d.addErrback(self.explain_web_error)
2519        return d
2520
2521    def test_POST_NEWDIRURL_immutable_bad(self):
2522        (newkids, caps) = self._create_initial_children()
2523        d = self.shouldFail2(error.Error, "test_POST_NEWDIRURL_immutable_bad",
2524                             "400 Bad Request",
2525                             "needed to be immutable but was not",
2526                             self.POST2,
2527                             self.public_url + "/foo/newdir?t=mkdir-immutable",
2528                             json.dumps(newkids))
2529        return d
2530
2531    def test_PUT_NEWDIRURL_exists(self):
2532        d = self.PUT(self.public_url + "/foo/sub?t=mkdir", "")
2533        d.addCallback(lambda res:
2534                      self.failUnlessNodeHasChild(self._foo_node, u"sub"))
2535        d.addCallback(lambda res: self._foo_node.get(u"sub"))
2536        d.addCallback(self.failUnlessNodeKeysAre, [u"baz.txt"])
2537        return d
2538
2539    def test_PUT_NEWDIRURL_blocked(self):
2540        d = self.shouldFail2(error.Error, "PUT_NEWDIRURL_blocked",
2541                             "409 Conflict", "Unable to create directory 'bar.txt': a file was in the way",
2542                             self.PUT,
2543                             self.public_url + "/foo/bar.txt/sub?t=mkdir", "")
2544        d.addCallback(lambda res:
2545                      self.failUnlessNodeHasChild(self._foo_node, u"sub"))
2546        d.addCallback(lambda res: self._foo_node.get(u"sub"))
2547        d.addCallback(self.failUnlessNodeKeysAre, [u"baz.txt"])
2548        return d
2549
2550    def test_PUT_NEWDIRURL_mkdirs(self):
2551        d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir", "")
2552        d.addCallback(lambda res:
2553                      self.failIfNodeHasChild(self._foo_node, u"newdir"))
2554        d.addCallback(lambda res:
2555                      self.failUnlessNodeHasChild(self._foo_node, u"subdir"))
2556        d.addCallback(lambda res:
2557                      self._foo_node.get_child_at_path(u"subdir/newdir"))
2558        d.addCallback(self.failUnlessNodeKeysAre, [])
2559        return d
2560
2561    def test_PUT_NEWDIRURL_mkdirs_mdmf(self):
2562        d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir&format=mdmf", "")
2563        d.addCallback(lambda ignored:
2564            self.failUnlessNodeHasChild(self._foo_node, u"subdir"))
2565        d.addCallback(lambda ignored:
2566            self.failIfNodeHasChild(self._foo_node, u"newdir"))
2567        d.addCallback(lambda ignored:
2568            self._foo_node.get_child_at_path(u"subdir"))
2569        def _got_subdir(subdir):
2570            # XXX: What we want?
2571            #self.failUnlessEqual(subdir._node.get_version(), MDMF_VERSION)
2572            self.failUnlessNodeHasChild(subdir, u"newdir")
2573            return subdir.get_child_at_path(u"newdir")
2574        d.addCallback(_got_subdir)
2575        d.addCallback(lambda newdir:
2576            self.failUnlessEqual(newdir._node.get_version(), MDMF_VERSION))
2577        return d
2578
2579    def test_PUT_NEWDIRURL_mkdirs_sdmf(self):
2580        d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir&format=sdmf", "")
2581        d.addCallback(lambda ignored:
2582            self.failUnlessNodeHasChild(self._foo_node, u"subdir"))
2583        d.addCallback(lambda ignored:
2584            self.failIfNodeHasChild(self._foo_node, u"newdir"))
2585        d.addCallback(lambda ignored:
2586            self._foo_node.get_child_at_path(u"subdir"))
2587        def _got_subdir(subdir):
2588            # XXX: What we want?
2589            #self.failUnlessEqual(subdir._node.get_version(), MDMF_VERSION)
2590            self.failUnlessNodeHasChild(subdir, u"newdir")
2591            return subdir.get_child_at_path(u"newdir")
2592        d.addCallback(_got_subdir)
2593        d.addCallback(lambda newdir:
2594            self.failUnlessEqual(newdir._node.get_version(), SDMF_VERSION))
2595        return d
2596
2597    @inlineCallbacks
2598    def test_PUT_NEWDIRURL_mkdirs_bad_format(self):
2599        url = (self.webish_url + self.public_url +
2600               "/foo/subdir/newdir?t=mkdir&format=foo")
2601        yield self.assertHTTPError(url, 400, "Unknown format: foo",
2602                                   method="put", data="")
2603
2604    def test_DELETE_DIRURL(self):
2605        d = self.DELETE(self.public_url + "/foo")
2606        d.addCallback(lambda res:
2607                      self.failIfNodeHasChild(self.public_root, u"foo"))
2608        return d
2609
2610    def test_DELETE_DIRURL_missing(self):
2611        d = self.DELETE(self.public_url + "/foo/missing")
2612        d.addBoth(self.should404, "test_DELETE_DIRURL_missing")
2613        d.addCallback(lambda res:
2614                      self.failUnlessNodeHasChild(self.public_root, u"foo"))
2615        return d
2616
2617    def test_DELETE_DIRURL_missing2(self):
2618        d = self.DELETE(self.public_url + "/missing")
2619        d.addBoth(self.should404, "test_DELETE_DIRURL_missing2")
2620        return d
2621
2622    def dump_root(self):
2623        print("NODEWALK")
2624        w = webish.DirnodeWalkerMixin()
2625        def visitor(childpath, childnode, metadata):
2626            print(childpath)
2627        d = w.walk(self.public_root, visitor)
2628        return d
2629
2630    def failUnlessNodeKeysAre(self, node, expected_keys):
2631        for k in expected_keys:
2632            assert isinstance(k, str)
2633        d = node.list()
2634        def _check(children):
2635            self.failUnlessReallyEqual(sorted(children.keys()), sorted(expected_keys))
2636        d.addCallback(_check)
2637        return d
2638    def failUnlessNodeHasChild(self, node, name):
2639        assert isinstance(name, str)
2640        d = node.list()
2641        def _check(children):
2642            self.failUnlessIn(name, children)
2643        d.addCallback(_check)
2644        return d
2645    def failIfNodeHasChild(self, node, name):
2646        assert isinstance(name, str)
2647        d = node.list()
2648        def _check(children):
2649            self.failIfIn(name, children)
2650        d.addCallback(_check)
2651        return d
2652
2653    def failUnlessChildContentsAre(self, node, name, expected_contents):
2654        assert isinstance(name, str)
2655        d = node.get_child_at_path(name)
2656        d.addCallback(lambda node: download_to_data(node))
2657        def _check(contents):
2658            self.failUnlessReallyEqual(contents, expected_contents)
2659        d.addCallback(_check)
2660        return d
2661
2662    def failUnlessMutableChildContentsAre(self, node, name, expected_contents):
2663        assert isinstance(name, str)
2664        d = node.get_child_at_path(name)
2665        d.addCallback(lambda node: node.download_best_version())
2666        def _check(contents):
2667            self.failUnlessReallyEqual(contents, expected_contents)
2668        d.addCallback(_check)
2669        return d
2670
2671    def failUnlessRWChildURIIs(self, node, name, expected_uri):
2672        assert isinstance(name, str)
2673        d = node.get_child_at_path(name)
2674        def _check(child):
2675            self.failUnless(child.is_unknown() or not child.is_readonly())
2676            self.failUnlessReallyEqual(child.get_uri(), expected_uri.strip())
2677            self.failUnlessReallyEqual(child.get_write_uri(), expected_uri.strip())
2678            expected_ro_uri = self._make_readonly(expected_uri)
2679            if expected_ro_uri:
2680                self.failUnlessReallyEqual(child.get_readonly_uri(), expected_ro_uri.strip())
2681        d.addCallback(_check)
2682        return d
2683
2684    def failUnlessROChildURIIs(self, node, name, expected_uri):
2685        assert isinstance(name, str)
2686        d = node.get_child_at_path(name)
2687        def _check(child):
2688            self.failUnless(child.is_unknown() or child.is_readonly())
2689            self.failUnlessReallyEqual(child.get_write_uri(), None)
2690            self.failUnlessReallyEqual(child.get_uri(), ensure_binary(expected_uri.strip()))
2691            self.failUnlessReallyEqual(child.get_readonly_uri(), ensure_binary(expected_uri.strip()))
2692        d.addCallback(_check)
2693        return d
2694
2695    def failUnlessURIMatchesRWChild(self, got_uri, node, name):
2696        assert isinstance(name, str)
2697        d = node.get_child_at_path(name)
2698        def _check(child):
2699            self.failUnless(child.is_unknown() or not child.is_readonly())
2700            self.failUnlessReallyEqual(child.get_uri(), ensure_binary(got_uri.strip()))
2701            self.failUnlessReallyEqual(child.get_write_uri(), ensure_binary(got_uri.strip()))
2702            expected_ro_uri = self._make_readonly(got_uri)
2703            if expected_ro_uri:
2704                self.failUnlessReallyEqual(child.get_readonly_uri(), expected_ro_uri.strip())
2705        d.addCallback(_check)
2706        return d
2707
2708    def failUnlessURIMatchesROChild(self, got_uri, node, name):
2709        assert isinstance(name, str)
2710        d = node.get_child_at_path(name)
2711        def _check(child):
2712            self.failUnless(child.is_unknown() or child.is_readonly())
2713            self.failUnlessReallyEqual(child.get_write_uri(), None)
2714            self.failUnlessReallyEqual(got_uri.strip(), child.get_uri())
2715            self.failUnlessReallyEqual(got_uri.strip(), child.get_readonly_uri())
2716        d.addCallback(_check)
2717        return d
2718
2719    def failUnlessCHKURIHasContents(self, got_uri, contents):
2720        if isinstance(got_uri, str):
2721            got_uri = got_uri.encode("utf-8")
2722        self.failUnless(self.get_all_contents()[got_uri] == contents)
2723
2724    def test_POST_upload(self):
2725        d = self.POST(self.public_url + "/foo", t="upload",
2726                      file=("new.txt", self.NEWFILE_CONTENTS))
2727        fn = self._foo_node
2728        d.addCallback(self.failUnlessURIMatchesROChild, fn, u"new.txt")
2729        d.addCallback(lambda res:
2730                      self.failUnlessChildContentsAre(fn, u"new.txt",
2731                                                      self.NEWFILE_CONTENTS))
2732        return d
2733
2734    def test_POST_upload_unicode(self):
2735        filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
2736        d = self.POST(self.public_url + "/foo", t="upload",
2737                      file=(filename, self.NEWFILE_CONTENTS))
2738        fn = self._foo_node
2739        d.addCallback(self.failUnlessURIMatchesROChild, fn, filename)
2740        d.addCallback(lambda res:
2741                      self.failUnlessChildContentsAre(fn, filename,
2742                                                      self.NEWFILE_CONTENTS))
2743        target_url = self.public_url + u"/foo/" + filename
2744        d.addCallback(lambda res: self.GET(target_url))
2745        d.addCallback(lambda contents: self.failUnlessReallyEqual(contents,
2746                                                                  self.NEWFILE_CONTENTS,
2747                                                                  contents))
2748        return d
2749
2750    def test_POST_upload_unicode_named(self):
2751        filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
2752        d = self.POST(self.public_url + "/foo", t="upload",
2753                      name=filename,
2754                      file=("overridden", self.NEWFILE_CONTENTS))
2755        fn = self._foo_node
2756        d.addCallback(self.failUnlessURIMatchesROChild, fn, filename)
2757        d.addCallback(lambda res:
2758                      self.failUnlessChildContentsAre(fn, filename,
2759                                                      self.NEWFILE_CONTENTS))
2760        target_url = self.public_url + u"/foo/" + filename
2761        d.addCallback(lambda res: self.GET(target_url))
2762        d.addCallback(lambda contents: self.failUnlessReallyEqual(contents,
2763                                                                  self.NEWFILE_CONTENTS,
2764                                                                  contents))
2765        return d
2766
2767    def test_POST_upload_no_link(self):
2768        d = self.POST("/uri", t="upload",
2769                      file=("new.txt", self.NEWFILE_CONTENTS))
2770        def _check_upload_results(page):
2771            page = str(page, "utf-8")
2772            # this should be a page which describes the results of the upload
2773            # that just finished.
2774            self.failUnlessIn("Upload Results:", page)
2775            self.failUnlessIn("URI:", page)
2776            uri_re = re.compile("URI: <tt><span>(.*)</span>")
2777            mo = uri_re.search(page)
2778            self.failUnless(mo, page)
2779            new_uri = mo.group(1)
2780            return new_uri
2781        d.addCallback(_check_upload_results)
2782        d.addCallback(self.failUnlessCHKURIHasContents, self.NEWFILE_CONTENTS)
2783        return d
2784
2785    @inlineCallbacks
2786    def test_POST_upload_no_link_whendone(self):
2787        body, headers = self.build_form(t="upload", when_done="/",
2788                                        file=("new.txt", self.NEWFILE_CONTENTS))
2789        yield self.shouldRedirectTo(self.webish_url + "/uri",
2790                                    self.webish_url + "/",
2791                                    method="post", data=body, headers=headers,
2792                                    code=http.FOUND)
2793
2794    @inlineCallbacks
2795    def test_POST_upload_no_link_whendone_results(self):
2796        # We encode "uri" as "%75ri" to exercise a case affected by ticket #1860
2797        body, headers = self.build_form(t="upload",
2798                                        when_done="/%75ri/%(uri)s",
2799                                        file=("new.txt", self.NEWFILE_CONTENTS),
2800                                        )
2801        redir_url = yield self.shouldRedirectTo(self.webish_url + "/uri", None,
2802                                                method="post",
2803                                                data=body, headers=headers,
2804                                                code=http.FOUND)
2805        res = yield do_http("get", redir_url)
2806        self.failUnlessReallyEqual(res, self.NEWFILE_CONTENTS)
2807
2808    def test_POST_upload_no_link_mutable(self):
2809        d = self.POST("/uri", t="upload", mutable="true",
2810                      file=("new.txt", self.NEWFILE_CONTENTS))
2811        def _check(filecap):
2812            filecap = filecap.strip()
2813            self.failUnless(filecap.startswith(b"URI:SSK:"), filecap)
2814            self.filecap = filecap
2815            u = uri.WriteableSSKFileURI.init_from_string(filecap)
2816            self.failUnlessIn(u.get_storage_index(), self.get_all_contents())
2817            n = self.s.create_node_from_uri(filecap)
2818            return n.download_best_version()
2819        d.addCallback(_check)
2820        def _check2(data):
2821            self.failUnlessReallyEqual(data, self.NEWFILE_CONTENTS)
2822            return self.GET("/uri/%s" % urlquote(self.filecap))
2823        d.addCallback(_check2)
2824        def _check3(data):
2825            self.failUnlessReallyEqual(data, self.NEWFILE_CONTENTS)
2826            return self.GET("/file/%s" % urlquote(self.filecap))
2827        d.addCallback(_check3)
2828        def _check4(data):
2829            self.failUnlessReallyEqual(data, self.NEWFILE_CONTENTS)
2830        d.addCallback(_check4)
2831        return d
2832
2833    def test_POST_upload_no_link_mutable_toobig(self):
2834        # The SDMF size limit is no longer in place, so we should be
2835        # able to upload mutable files that are as large as we want them
2836        # to be.
2837        d = self.POST("/uri", t="upload", mutable="true",
2838                      file=("new.txt", b"b" * (self.s.MUTABLE_SIZELIMIT + 1)))
2839        return d
2840
2841
2842    def test_POST_upload_format_unlinked(self):
2843        def _check_upload_unlinked(ign, format, uri_prefix):
2844            filename = format + ".txt"
2845            d = self.POST("/uri?t=upload&format=" + format,
2846                          file=(filename, self.NEWFILE_CONTENTS * 300000))
2847            def _got_results(results):
2848                if format.upper() in ("SDMF", "MDMF"):
2849                    # webapi.rst says this returns a filecap
2850                    filecap = results
2851                else:
2852                    # for immutable, it returns an "upload results page", and
2853                    # the filecap is buried inside
2854                    line = [l for l in results.split(b"\n") if b"URI: " in l][0]
2855                    mo = re.search(br'<span>([^<]+)</span>', line)
2856                    filecap = mo.group(1)
2857                self.failUnless(filecap.startswith(uri_prefix),
2858                                (uri_prefix, filecap))
2859                return self.GET("/uri/%s?t=json" % str(filecap, "utf-8"))
2860            d.addCallback(_got_results)
2861            def _got_json(raw):
2862                data = json.loads(raw)
2863                data = data[1]
2864                self.failUnlessIn("format", data)
2865                self.failUnlessEqual(data["format"], format.upper())
2866            d.addCallback(_got_json)
2867            return d
2868        d = defer.succeed(None)
2869        d.addCallback(_check_upload_unlinked, "chk", b"URI:CHK")
2870        d.addCallback(_check_upload_unlinked, "CHK", b"URI:CHK")
2871        d.addCallback(_check_upload_unlinked, "sdmf", b"URI:SSK")
2872        d.addCallback(_check_upload_unlinked, "mdmf", b"URI:MDMF")
2873        return d
2874
2875    @inlineCallbacks
2876    def test_POST_upload_bad_format_unlinked(self):
2877        url = self.webish_url + "/uri?t=upload&format=foo"
2878        body, headers = self.build_form(file=("foo.txt", self.NEWFILE_CONTENTS * 300000))
2879        yield self.assertHTTPError(url, 400,
2880                                   "Unknown format: foo",
2881                                   method="post", data=body, headers=headers)
2882
2883    async def test_POST_upload_keypair(self) -> None:
2884        """
2885        A *POST* creating a new mutable object may include a *private-key*
2886        query argument giving a urlsafe-base64-encoded RSA private key to use
2887        as the "signature key".  The given signature key is used, rather than
2888        a new one being generated.
2889        """
2890        format = "sdmf"
2891        priv, pub = create_signing_keypair(2048)
2892        encoded_privkey = urlsafe_b64encode(der_string_from_signing_key(priv)).decode("ascii")
2893        filename = "predetermined-sdmf"
2894        expected_content = self.NEWFILE_CONTENTS * 100
2895        actual_cap = uri.from_string(await self.POST(
2896            self.public_url +
2897            f"/foo?t=upload&format={format}&private-key={encoded_privkey}",
2898            file=(filename, expected_content),
2899        ))
2900        # Ideally we would inspect the private ("signature") and public
2901        # ("verification") keys but they are not made easily accessible here
2902        # (ostensibly because we have a FakeMutableFileNode instead of a real
2903        # one).
2904        #
2905        # So, instead, re-compute the writekey and fingerprint and compare
2906        # those against the capability string.
2907        expected_writekey, _, expected_fingerprint = derive_mutable_keys((pub, priv))
2908        self.assertEqual(
2909            (expected_writekey, expected_fingerprint),
2910            (actual_cap.writekey, actual_cap.fingerprint),
2911        )
2912
2913        # And the capability we got can be used to download the data we
2914        # uploaded.
2915        downloaded_content = await self.GET(f"/uri/{actual_cap.to_string().decode('ascii')}")
2916        self.assertEqual(expected_content, downloaded_content)
2917
2918    def test_POST_upload_format(self):
2919        def _check_upload(ign, format, uri_prefix, fn=None):
2920            filename = format + ".txt"
2921            d = self.POST(self.public_url +
2922                          "/foo?t=upload&format=" + format,
2923                          file=(filename, self.NEWFILE_CONTENTS * 300000))
2924            def _got_filecap(filecap):
2925                if fn is not None:
2926                    filenameu = str(filename)
2927                    self.failUnlessURIMatchesRWChild(filecap, fn, filenameu)
2928                self.failUnless(filecap.startswith(uri_prefix))
2929                return self.GET(self.public_url + "/foo/%s?t=json" % filename)
2930            d.addCallback(_got_filecap)
2931            def _got_json(raw):
2932                data = json.loads(raw)
2933                data = data[1]
2934                self.failUnlessIn("format", data)
2935                self.failUnlessEqual(data["format"], format.upper())
2936            d.addCallback(_got_json)
2937            return d
2938
2939        d = defer.succeed(None)
2940        d.addCallback(_check_upload, "chk", b"URI:CHK")
2941        d.addCallback(_check_upload, "sdmf", b"URI:SSK", self._foo_node)
2942        d.addCallback(_check_upload, "mdmf", b"URI:MDMF")
2943        d.addCallback(_check_upload, "MDMF", b"URI:MDMF")
2944        return d
2945
2946    @inlineCallbacks
2947    def test_POST_upload_bad_format(self):
2948        url = self.webish_url + self.public_url + "/foo?t=upload&format=foo"
2949        body, headers = self.build_form(file=("foo.txt", self.NEWFILE_CONTENTS * 300000))
2950        yield self.assertHTTPError(url, 400, "Unknown format: foo",
2951                                   method="post", data=body, headers=headers)
2952
2953    def test_POST_upload_mutable(self):
2954        # this creates a mutable file
2955        d = self.POST(self.public_url + "/foo", t="upload", mutable="true",
2956                      file=("new.txt", self.NEWFILE_CONTENTS))
2957        fn = self._foo_node
2958        d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
2959        d.addCallback(lambda res:
2960                      self.failUnlessMutableChildContentsAre(fn, u"new.txt",
2961                                                             self.NEWFILE_CONTENTS))
2962        d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
2963        def _got(newnode):
2964            self.failUnless(IMutableFileNode.providedBy(newnode))
2965            self.failUnless(newnode.is_mutable())
2966            self.failIf(newnode.is_readonly())
2967            self._mutable_node = newnode
2968            self._mutable_uri = newnode.get_uri()
2969        d.addCallback(_got)
2970
2971        # now upload it again and make sure that the URI doesn't change
2972        NEWER_CONTENTS = self.NEWFILE_CONTENTS + b"newer\n"
2973        d.addCallback(lambda res:
2974                      self.POST(self.public_url + "/foo", t="upload",
2975                                mutable="true",
2976                                file=("new.txt", NEWER_CONTENTS)))
2977        d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
2978        d.addCallback(lambda res:
2979                      self.failUnlessMutableChildContentsAre(fn, u"new.txt",
2980                                                             NEWER_CONTENTS))
2981        d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
2982        def _got2(newnode):
2983            self.failUnless(IMutableFileNode.providedBy(newnode))
2984            self.failUnless(newnode.is_mutable())
2985            self.failIf(newnode.is_readonly())
2986            self.failUnlessReallyEqual(self._mutable_uri, newnode.get_uri())
2987        d.addCallback(_got2)
2988
2989        # upload a second time, using PUT instead of POST
2990        NEW2_CONTENTS = NEWER_CONTENTS + b"overwrite with PUT\n"
2991        d.addCallback(lambda res:
2992                      self.PUT(self.public_url + "/foo/new.txt", NEW2_CONTENTS))
2993        d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
2994        d.addCallback(lambda res:
2995                      self.failUnlessMutableChildContentsAre(fn, u"new.txt",
2996                                                             NEW2_CONTENTS))
2997
2998        # finally list the directory, since mutable files are displayed
2999        # slightly differently
3000
3001        d.addCallback(lambda res:
3002                      self.GET(self.public_url + "/foo",
3003                               followRedirect=True))
3004        def _check_page(res):
3005            # TODO: assert more about the contents
3006            self.failUnlessIn(b"SSK", res)
3007            return res
3008        d.addCallback(_check_page)
3009
3010        d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
3011        def _got3(newnode):
3012            self.failUnless(IMutableFileNode.providedBy(newnode))
3013            self.failUnless(newnode.is_mutable())
3014            self.failIf(newnode.is_readonly())
3015            self.failUnlessReallyEqual(self._mutable_uri, newnode.get_uri())
3016        d.addCallback(_got3)
3017
3018        # look at the JSON form of the enclosing directory
3019        d.addCallback(lambda res:
3020                      self.GET(self.public_url + "/foo?t=json",
3021                               followRedirect=True))
3022        def _check_page_json(res):
3023            parsed = json.loads(res)
3024            self.failUnlessEqual(parsed[0], "dirnode")
3025            children = dict( [(str(name),value)
3026                              for (name,value)
3027                              in list(parsed[1]["children"].items())] )
3028            self.failUnlessIn(u"new.txt", children)
3029            new_json = children[u"new.txt"]
3030            self.failUnlessEqual(new_json[0], "filenode")
3031            self.failUnless(new_json[1]["mutable"])
3032            self.failUnlessReallyEqual(to_bytes(new_json[1]["rw_uri"]), self._mutable_uri)
3033            ro_uri = self._mutable_node.get_readonly().to_string()
3034            self.failUnlessReallyEqual(to_bytes(new_json[1]["ro_uri"]), ro_uri)
3035        d.addCallback(_check_page_json)
3036
3037        # and the JSON form of the file
3038        d.addCallback(lambda res:
3039                      self.GET(self.public_url + "/foo/new.txt?t=json"))
3040        def _check_file_json(res):
3041            parsed = json.loads(res)
3042            self.failUnlessEqual(parsed[0], "filenode")
3043            self.failUnless(parsed[1]["mutable"])
3044            self.failUnlessReallyEqual(to_bytes(parsed[1]["rw_uri"]), self._mutable_uri)
3045            ro_uri = self._mutable_node.get_readonly().to_string()
3046            self.failUnlessReallyEqual(to_bytes(parsed[1]["ro_uri"]), ro_uri)
3047        d.addCallback(_check_file_json)
3048
3049        # and look at t=uri and t=readonly-uri
3050        d.addCallback(lambda res:
3051                      self.GET(self.public_url + "/foo/new.txt?t=uri"))
3052        d.addCallback(lambda res: self.failUnlessReallyEqual(res, self._mutable_uri))
3053        d.addCallback(lambda res:
3054                      self.GET(self.public_url + "/foo/new.txt?t=readonly-uri"))
3055        def _check_ro_uri(res):
3056            ro_uri = self._mutable_node.get_readonly().to_string()
3057            self.failUnlessReallyEqual(res, ro_uri)
3058        d.addCallback(_check_ro_uri)
3059
3060        # make sure we can get to it from /uri/URI
3061        d.addCallback(lambda res:
3062                      self.GET("/uri/%s" % urlquote(self._mutable_uri)))
3063        d.addCallback(lambda res:
3064                      self.failUnlessReallyEqual(res, NEW2_CONTENTS))
3065
3066        # and that HEAD computes the size correctly
3067        d.addCallback(lambda res:
3068                      self.HEAD(self.public_url + "/foo/new.txt",
3069                                return_response=True))
3070        def _got_headers(res_and_status_and_headers):
3071            (res, status, headers) = res_and_status_and_headers
3072            self.failUnlessReallyEqual(res, "")
3073            self.failUnlessReallyEqual(int(headers.getRawHeaders("content-length")[0]),
3074                                       len(NEW2_CONTENTS))
3075            self.failUnlessReallyEqual(headers.getRawHeaders("content-type"),
3076                                       ["text/plain"])
3077        d.addCallback(_got_headers)
3078
3079        # make sure that outdated size limits aren't enforced anymore.
3080        d.addCallback(lambda ignored:
3081            self.POST(self.public_url + "/foo", t="upload",
3082                      mutable="true",
3083                      file=("new.txt",
3084                            b"b" * (self.s.MUTABLE_SIZELIMIT+1))))
3085        d.addErrback(self.dump_error)
3086        return d
3087
3088    def test_POST_upload_mutable_toobig(self):
3089        # SDMF had a size limti that was removed a while ago. MDMF has
3090        # never had a size limit. Test to make sure that we do not
3091        # encounter errors when trying to upload large mutable files,
3092        # since there should be no coded prohibitions regarding large
3093        # mutable files.
3094        d = self.POST(self.public_url + "/foo",
3095                      t="upload", mutable="true",
3096                      file=("new.txt", b"b" * (self.s.MUTABLE_SIZELIMIT + 1)))
3097        return d
3098
3099    def dump_error(self, f):
3100        # if the web server returns an error code (like 400 Bad Request),
3101        # web.client.getPage puts the HTTP response body into the .response
3102        # attribute of the exception object that it gives back. It does not
3103        # appear in the Failure's repr(), so the ERROR that trial displays
3104        # will be rather terse and unhelpful. addErrback this method to the
3105        # end of your chain to get more information out of these errors.
3106        if f.check(error.Error):
3107            print("web.error.Error:")
3108            print(f)
3109            print(f.value.response)
3110        return f
3111
3112    def test_POST_upload_replace(self):
3113        d = self.POST(self.public_url + "/foo", t="upload",
3114                      file=("bar.txt", self.NEWFILE_CONTENTS))
3115        fn = self._foo_node
3116        d.addCallback(self.failUnlessURIMatchesROChild, fn, u"bar.txt")
3117        d.addCallback(lambda res:
3118                      self.failUnlessChildContentsAre(fn, u"bar.txt",
3119                                                      self.NEWFILE_CONTENTS))
3120        return d
3121
3122    def test_POST_upload_no_replace_ok(self):
3123        d = self.POST(self.public_url + "/foo?replace=false", t="upload",
3124                      file=("new.txt", self.NEWFILE_CONTENTS))
3125        d.addCallback(lambda res: self.GET(self.public_url + "/foo/new.txt"))
3126        d.addCallback(lambda res: self.failUnlessReallyEqual(res,
3127                                                             self.NEWFILE_CONTENTS))
3128        return d
3129
3130    def test_POST_upload_no_replace_queryarg(self):
3131        d = self.POST(self.public_url + "/foo?replace=false", t="upload",
3132                      file=("bar.txt", self.NEWFILE_CONTENTS))
3133        d.addBoth(self.shouldFail, error.Error,
3134                  "POST_upload_no_replace_queryarg",
3135                  "409 Conflict",
3136                  "There was already a child by that name, and you asked me "
3137                  "to not replace it")
3138        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
3139        d.addCallback(self.failUnlessIsBarDotTxt)
3140        return d
3141
3142    def test_POST_upload_no_replace_field(self):
3143        d = self.POST(self.public_url + "/foo", t="upload", replace="false",
3144                      file=("bar.txt", self.NEWFILE_CONTENTS))
3145        d.addBoth(self.shouldFail, error.Error, "POST_upload_no_replace_field",
3146                  "409 Conflict",
3147                  "There was already a child by that name, and you asked me "
3148                  "to not replace it")
3149        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
3150        d.addCallback(self.failUnlessIsBarDotTxt)
3151        return d
3152
3153    @inlineCallbacks
3154    def test_POST_upload_whendone(self):
3155        body, headers = self.build_form(t="upload", when_done="/THERE",
3156                                        file=("new.txt", self.NEWFILE_CONTENTS))
3157        yield self.shouldRedirectTo(self.webish_url + self.public_url + "/foo",
3158                                    "/THERE",
3159                                    method="post", data=body, headers=headers,
3160                                    code=http.FOUND)
3161        fn = self._foo_node
3162        yield self.failUnlessChildContentsAre(fn, u"new.txt",
3163                                              self.NEWFILE_CONTENTS)
3164
3165    def test_POST_upload_named(self):
3166        NEWFILE_CONTENTS = self.NEWFILE_CONTENTS + b"\xFF\x00\xFF"
3167        fn = self._foo_node
3168        d = self.POST(self.public_url + "/foo", t="upload",
3169                      name="new.txt", file=NEWFILE_CONTENTS)
3170        d.addCallback(self.failUnlessURIMatchesROChild, fn, u"new.txt")
3171        d.addCallback(lambda res:
3172                      self.failUnlessChildContentsAre(fn, u"new.txt",
3173                                                      NEWFILE_CONTENTS))
3174        return d
3175
3176    def test_POST_upload_named_badfilename(self):
3177        d = self.POST(self.public_url + "/foo", t="upload",
3178                      name="slashes/are/bad.txt", file=self.NEWFILE_CONTENTS)
3179        d.addBoth(self.shouldFail, error.Error,
3180                  "test_POST_upload_named_badfilename",
3181                  "400 Bad Request",
3182                  "name= may not contain a slash",
3183                  )
3184        # make sure that nothing was added
3185        d.addCallback(lambda res:
3186                      self.failUnlessNodeKeysAre(self._foo_node,
3187                                                 [self._htmlname_unicode,
3188                                                  u"bar.txt", u"baz.txt", u"blockingfile",
3189                                                  u"empty", u"n\u00fc.txt", u"quux.txt",
3190                                                  u"sub"]))
3191        return d
3192
3193    @inlineCallbacks
3194    def test_POST_FILEURL_check(self):
3195        bar_url = self.public_url + "/foo/bar.txt"
3196        res = yield self.POST(bar_url, t="check")
3197        self.failUnlessIn(b"Healthy :", res)
3198
3199        redir_url = "http://allmydata.org/TARGET"
3200        body, headers = self.build_form(t="check", when_done=redir_url)
3201        yield self.shouldRedirectTo(self.webish_url + bar_url, redir_url,
3202                                    method="post", data=body, headers=headers,
3203                                    code=http.FOUND)
3204
3205        res = yield self.POST(bar_url, t="check", return_to=redir_url)
3206        res = str(res, "utf-8")
3207        self.failUnlessIn("Healthy :", res)
3208        self.failUnlessIn("Return to file", res)
3209        self.failUnlessIn(redir_url, res)
3210
3211        res = yield self.POST(bar_url, t="check", output="JSON")
3212        data = json.loads(res)
3213        self.failUnlessIn("storage-index", data)
3214        self.failUnless(data["results"]["healthy"])
3215
3216    @inlineCallbacks
3217    def test_POST_FILEURL_check_and_repair(self):
3218        bar_url = self.public_url + "/foo/bar.txt"
3219        res = yield self.POST(bar_url, t="check", repair="true")
3220        self.failUnlessIn(b"Healthy :", res)
3221
3222        redir_url = "http://allmydata.org/TARGET"
3223        body, headers = self.build_form(t="check", repair="true",
3224                                        when_done=redir_url)
3225        yield self.shouldRedirectTo(self.webish_url + bar_url, redir_url,
3226                                    method="post", data=body, headers=headers,
3227                                    code=http.FOUND)
3228
3229        res = yield self.POST(bar_url, t="check", return_to=redir_url)
3230        res = str(res, "utf-8")
3231        self.failUnlessIn("Healthy :", res)
3232        self.failUnlessIn("Return to file", res)
3233        self.failUnlessIn(redir_url, res)
3234
3235    @inlineCallbacks
3236    def test_POST_DIRURL_check(self):
3237        foo_url = self.public_url + "/foo"
3238        res = yield self.POST(foo_url, t="check")
3239        self.failUnlessIn(b"Healthy :", res)
3240
3241        redir_url = "http://allmydata.org/TARGET"
3242        body, headers = self.build_form(t="check", when_done=redir_url)
3243        yield self.shouldRedirectTo(self.webish_url + foo_url, redir_url,
3244                                    method="post", data=body, headers=headers,
3245                                    code=http.FOUND)
3246
3247        res = yield self.POST(foo_url, t="check", return_to=redir_url)
3248        res = str(res, "utf-8")
3249        self.failUnlessIn("Healthy :", res)
3250        self.failUnlessIn("Return to file/directory", res)
3251        self.failUnlessIn(redir_url, res)
3252
3253        res = yield self.POST(foo_url, t="check", output="JSON")
3254        data = json.loads(res)
3255        self.failUnlessIn("storage-index", data)
3256        self.failUnless(data["results"]["healthy"])
3257
3258    @inlineCallbacks
3259    def test_POST_DIRURL_check_and_repair(self):
3260        foo_url = self.public_url + "/foo"
3261        res = yield self.POST(foo_url, t="check", repair="true")
3262        self.failUnlessIn(b"Healthy :", res)
3263
3264        redir_url = "http://allmydata.org/TARGET"
3265        body, headers = self.build_form(t="check", repair="true",
3266                                        when_done=redir_url)
3267        yield self.shouldRedirectTo(self.webish_url + foo_url, redir_url,
3268                                    method="post", data=body, headers=headers,
3269                                    code=http.FOUND)
3270        res = yield self.POST(foo_url, t="check", return_to=redir_url)
3271        res = str(res, "utf-8")
3272        self.failUnlessIn("Healthy :", res)
3273        self.failUnlessIn("Return to file/directory", res)
3274        self.failUnlessIn(redir_url, res)
3275
3276    def test_POST_FILEURL_mdmf_check(self):
3277        quux_url = "/uri/%s" % urlquote(self._quux_txt_uri)
3278        d = self.POST(quux_url, t="check")
3279        def _check(res):
3280            self.failUnlessIn(b"Healthy", res)
3281        d.addCallback(_check)
3282        quux_extension_url = "/uri/%s" % urlquote("%s:3:131073" % str(self._quux_txt_uri, "utf-8"))
3283        d.addCallback(lambda ignored:
3284                      self.POST(quux_extension_url, t="check"))
3285        d.addCallback(_check)
3286        return d
3287
3288    def test_POST_FILEURL_mdmf_check_and_repair(self):
3289        quux_url = "/uri/%s" % urlquote(self._quux_txt_uri)
3290        d = self.POST(quux_url, t="check", repair="true")
3291        def _check(res):
3292            self.failUnlessIn(b"Healthy", res)
3293        d.addCallback(_check)
3294        quux_extension_url = "/uri/%s" % urlquote("%s:3:131073" % str(self._quux_txt_uri, "ascii"))
3295        d.addCallback(lambda ignored:
3296                      self.POST(quux_extension_url, t="check", repair="true"))
3297        d.addCallback(_check)
3298        return d
3299
3300    def wait_for_operation(self, ignored, ophandle):
3301        url = "/operations/" + ophandle
3302        url += "?t=status&output=JSON"
3303        d = self.GET(url)
3304        def _got(res):
3305            data = json.loads(res)
3306            if not data["finished"]:
3307                d = self.stall(delay=1.0)
3308                d.addCallback(self.wait_for_operation, ophandle)
3309                return d
3310            return data
3311        d.addCallback(_got)
3312        return d
3313
3314    def get_operation_results(self, ignored, ophandle, output=None):
3315        url = "/operations/" + ophandle
3316        url += "?t=status"
3317        if output:
3318            url += "&output=" + output
3319        d = self.GET(url)
3320        def _got(res):
3321            if output and output.lower() == "json":
3322                return json.loads(res)
3323            return res
3324        d.addCallback(_got)
3325        return d
3326
3327    def test_POST_DIRURL_deepcheck_no_ophandle(self):
3328        d = self.shouldFail2(error.Error,
3329                             "test_POST_DIRURL_deepcheck_no_ophandle",
3330                             "400 Bad Request",
3331                             "slow operation requires ophandle=",
3332                             self.POST, self.public_url, t="start-deep-check")
3333        return d
3334
3335    @inlineCallbacks
3336    def test_POST_DIRURL_deepcheck(self):
3337        body, headers = self.build_form(t="start-deep-check", ophandle="123")
3338        yield self.shouldRedirectTo(self.webish_url + self.public_url,
3339                                    self.webish_url + "/operations/123",
3340                                    method="post", data=body, headers=headers,
3341                                    code=http.FOUND)
3342
3343        data = yield self.wait_for_operation(None, "123")
3344        self.failUnlessReallyEqual(data["finished"], True)
3345        self.failUnlessReallyEqual(data["count-objects-checked"], 11)
3346        self.failUnlessReallyEqual(data["count-objects-healthy"], 11)
3347
3348        res = yield self.get_operation_results(None, "123", "html")
3349        self.failUnlessIn(b"Objects Checked: <span>11</span>", res)
3350        self.failUnlessIn(b"Objects Healthy: <span>11</span>", res)
3351        soup = BeautifulSoup(res, 'html5lib')
3352        assert_soup_has_favicon(self, soup)
3353
3354        res = yield self.GET("/operations/123/")
3355        # should be the same as without the slash
3356        self.failUnlessIn(b"Objects Checked: <span>11</span>", res)
3357        self.failUnlessIn(b"Objects Healthy: <span>11</span>", res)
3358        soup = BeautifulSoup(res, 'html5lib')
3359        assert_soup_has_favicon(self, soup)
3360
3361        yield self.shouldFail2(error.Error, "one", "404 Not Found",
3362                               "No detailed results for SI bogus",
3363                               self.GET, "/operations/123/bogus")
3364
3365        foo_si = self._foo_node.get_storage_index()
3366        foo_si_s = base32.b2a(foo_si)
3367        res = yield self.GET("/operations/123/%s?output=JSON" % str(foo_si_s, "ascii"))
3368        data = json.loads(res)
3369        self.failUnlessEqual(data["storage-index"], str(foo_si_s, "ascii"))
3370        self.failUnless(data["results"]["healthy"])
3371
3372    def test_POST_DIRURL_deepcheck_and_repair(self):
3373        url = self.webish_url + self.public_url
3374        body, headers = self.build_form(t="start-deep-check", repair="true",
3375                                        ophandle="124", output="json")
3376        d = do_http("post", url, data=body, headers=headers,
3377                    allow_redirects=True,
3378                    browser_like_redirects=True)
3379        d.addCallback(self.wait_for_operation, "124")
3380        def _check_json(data):
3381            self.failUnlessReallyEqual(data["finished"], True)
3382            self.failUnlessReallyEqual(data["count-objects-checked"], 11)
3383            self.failUnlessReallyEqual(data["count-objects-healthy-pre-repair"], 11)
3384            self.failUnlessReallyEqual(data["count-objects-unhealthy-pre-repair"], 0)
3385            self.failUnlessReallyEqual(data["count-corrupt-shares-pre-repair"], 0)
3386            self.failUnlessReallyEqual(data["count-repairs-attempted"], 0)
3387            self.failUnlessReallyEqual(data["count-repairs-successful"], 0)
3388            self.failUnlessReallyEqual(data["count-repairs-unsuccessful"], 0)
3389            self.failUnlessReallyEqual(data["count-objects-healthy-post-repair"], 11)
3390            self.failUnlessReallyEqual(data["count-objects-unhealthy-post-repair"], 0)
3391            self.failUnlessReallyEqual(data["count-corrupt-shares-post-repair"], 0)
3392        d.addCallback(_check_json)
3393        d.addCallback(self.get_operation_results, "124", "html")
3394        def _check_html(res):
3395            res = str(res, "utf-8")
3396            self.failUnlessIn("Objects Checked: <span>11</span>", res)
3397
3398            self.failUnlessIn("Objects Healthy (before repair): <span>11</span>", res)
3399            self.failUnlessIn("Objects Unhealthy (before repair): <span>0</span>", res)
3400            self.failUnlessIn("Corrupt Shares (before repair): <span>0</span>", res)
3401
3402            self.failUnlessIn("Repairs Attempted: <span>0</span>", res)
3403            self.failUnlessIn("Repairs Successful: <span>0</span>", res)
3404            self.failUnlessIn("Repairs Unsuccessful: <span>0</span>", res)
3405
3406            self.failUnlessIn("Objects Healthy (after repair): <span>11</span>", res)
3407            self.failUnlessIn("Objects Unhealthy (after repair): <span>0</span>", res)
3408            self.failUnlessIn("Corrupt Shares (after repair): <span>0</span>", res)
3409
3410            soup = BeautifulSoup(res, 'html5lib')
3411            assert_soup_has_favicon(self, soup)
3412        d.addCallback(_check_html)
3413        return d
3414
3415    def test_POST_FILEURL_bad_t(self):
3416        d = self.shouldFail2(error.Error, "POST_bad_t", "400 Bad Request",
3417                             "POST to file: bad t=bogus",
3418                             self.POST, self.public_url + "/foo/bar.txt",
3419                             t="bogus")
3420        return d
3421
3422    def test_POST_mkdir(self): # return value?
3423        d = self.POST(self.public_url + "/foo", t="mkdir", name="newdir")
3424        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3425        d.addCallback(self.failUnlessNodeKeysAre, [])
3426        return d
3427
3428    def test_POST_mkdir_mdmf(self):
3429        d = self.POST(self.public_url + "/foo?t=mkdir&name=newdir&format=mdmf")
3430        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3431        d.addCallback(lambda node:
3432            self.failUnlessEqual(node._node.get_version(), MDMF_VERSION))
3433        return d
3434
3435    def test_POST_mkdir_sdmf(self):
3436        d = self.POST(self.public_url + "/foo?t=mkdir&name=newdir&format=sdmf")
3437        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3438        d.addCallback(lambda node:
3439            self.failUnlessEqual(node._node.get_version(), SDMF_VERSION))
3440        return d
3441
3442    @inlineCallbacks
3443    def test_POST_mkdir_bad_format(self):
3444        url = (self.webish_url + self.public_url +
3445               "/foo?t=mkdir&name=newdir&format=foo")
3446        yield self.assertHTTPError(url, 400, "Unknown format: foo",
3447                                   method="post")
3448
3449    def test_POST_mkdir_initial_children(self):
3450        (newkids, caps) = self._create_initial_children()
3451        d = self.POST2(self.public_url +
3452                       "/foo?t=mkdir-with-children&name=newdir",
3453                       json.dumps(newkids))
3454        d.addCallback(lambda res:
3455                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
3456        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3457        d.addCallback(self.failUnlessNodeKeysAre, list(newkids.keys()))
3458        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3459        d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
3460        return d
3461
3462    def test_POST_mkdir_initial_children_mdmf(self):
3463        (newkids, caps) = self._create_initial_children()
3464        d = self.POST2(self.public_url +
3465                       "/foo?t=mkdir-with-children&name=newdir&format=mdmf",
3466                       json.dumps(newkids))
3467        d.addCallback(lambda res:
3468                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
3469        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3470        d.addCallback(lambda node:
3471            self.failUnlessEqual(node._node.get_version(), MDMF_VERSION))
3472        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3473        d.addCallback(self.failUnlessROChildURIIs, u"child-imm",
3474                       caps['filecap1'])
3475        return d
3476
3477    # XXX: Duplication.
3478    def test_POST_mkdir_initial_children_sdmf(self):
3479        (newkids, caps) = self._create_initial_children()
3480        d = self.POST2(self.public_url +
3481                       "/foo?t=mkdir-with-children&name=newdir&format=sdmf",
3482                       json.dumps(newkids))
3483        d.addCallback(lambda res:
3484                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
3485        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3486        d.addCallback(lambda node:
3487            self.failUnlessEqual(node._node.get_version(), SDMF_VERSION))
3488        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3489        d.addCallback(self.failUnlessROChildURIIs, u"child-imm",
3490                       caps['filecap1'])
3491        return d
3492
3493    @inlineCallbacks
3494    def test_POST_mkdir_initial_children_bad_format(self):
3495        (newkids, caps) = self._create_initial_children()
3496        url = (self.webish_url + self.public_url +
3497               "/foo?t=mkdir-with-children&name=newdir&format=foo")
3498        yield self.assertHTTPError(url, 400, "Unknown format: foo",
3499                                   method="post", data=json.dumps(newkids).encode("utf-8"))
3500
3501    def test_POST_mkdir_immutable(self):
3502        (newkids, caps) = self._create_immutable_children()
3503        d = self.POST2(self.public_url +
3504                       "/foo?t=mkdir-immutable&name=newdir",
3505                       json.dumps(newkids))
3506        d.addCallback(lambda res:
3507                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
3508        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3509        d.addCallback(self.failUnlessNodeKeysAre, list(newkids.keys()))
3510        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3511        d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
3512        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3513        d.addCallback(self.failUnlessROChildURIIs, u"unknownchild-imm", caps['unknown_immcap'])
3514        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3515        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-imm", caps['immdircap'])
3516        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3517        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-lit", caps['litdircap'])
3518        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3519        d.addCallback(self.failUnlessROChildURIIs, u"dirchild-empty", caps['emptydircap'])
3520        return d
3521
3522    def test_POST_mkdir_immutable_bad(self):
3523        (newkids, caps) = self._create_initial_children()
3524        d = self.shouldFail2(error.Error, "POST_mkdir_immutable_bad",
3525                             "400 Bad Request",
3526                             "needed to be immutable but was not",
3527                             self.POST2,
3528                             self.public_url +
3529                             "/foo?t=mkdir-immutable&name=newdir",
3530                             json.dumps(newkids))
3531        return d
3532
3533    def test_POST_mkdir_2(self):
3534        d = self.POST2(self.public_url + "/foo/newdir?t=mkdir", "")
3535        d.addCallback(lambda res:
3536                      self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
3537        d.addCallback(lambda res: self._foo_node.get(u"newdir"))
3538        d.addCallback(self.failUnlessNodeKeysAre, [])
3539        return d
3540
3541    def test_POST_mkdirs_2(self):
3542        d = self.POST2(self.public_url + "/foo/bardir/newdir?t=mkdir", "")
3543        d.addCallback(lambda res:
3544                      self.failUnlessNodeHasChild(self._foo_node, u"bardir"))
3545        d.addCallback(lambda res: self._foo_node.get(u"bardir"))
3546        d.addCallback(lambda bardirnode: bardirnode.get(u"newdir"))
3547        d.addCallback(self.failUnlessNodeKeysAre, [])
3548        return d
3549
3550    def test_POST_mkdir_no_parentdir_noredirect(self):
3551        d = self.POST("/uri?t=mkdir")
3552        def _after_mkdir(res):
3553            uri.DirectoryURI.init_from_string(res)
3554        d.addCallback(_after_mkdir)
3555        return d
3556
3557    def test_POST_mkdir_no_parentdir_noredirect_mdmf(self):
3558        d = self.POST("/uri?t=mkdir&format=mdmf")
3559        def _after_mkdir(res):
3560            u = uri.from_string(res)
3561            # Check that this is an MDMF writecap
3562            self.failUnlessIsInstance(u, uri.MDMFDirectoryURI)
3563        d.addCallback(_after_mkdir)
3564        return d
3565
3566    def test_POST_mkdir_no_parentdir_noredirect_sdmf(self):
3567        d = self.POST("/uri?t=mkdir&format=sdmf")
3568        def _after_mkdir(res):
3569            u = uri.from_string(res)
3570            self.failUnlessIsInstance(u, uri.DirectoryURI)
3571        d.addCallback(_after_mkdir)
3572        return d
3573
3574    @inlineCallbacks
3575    def test_POST_mkdir_no_parentdir_noredirect_bad_format(self):
3576        url = self.webish_url + self.public_url + "/uri?t=mkdir&format=foo"
3577        yield self.assertHTTPError(url, 400, "Unknown format: foo",
3578                                   method="post")
3579
3580    def test_POST_mkdir_no_parentdir_noredirect2(self):
3581        # make sure form-based arguments (as on the welcome page) still work
3582        d = self.POST("/uri", t="mkdir")
3583        def _after_mkdir(res):
3584            uri.DirectoryURI.init_from_string(res)
3585        d.addCallback(_after_mkdir)
3586        d.addErrback(self.explain_web_error)
3587        return d
3588
3589    @inlineCallbacks
3590    def test_POST_mkdir_no_parentdir_redirect(self):
3591        url = self.webish_url + "/uri?t=mkdir&redirect_to_result=true"
3592        target = yield self.shouldRedirectTo(url, None, method="post",
3593                                             code=http.SEE_OTHER)
3594        target = urlunquote(str(target, "ascii"))
3595        self.failUnless(target.startswith("uri/URI:DIR2:"), target)
3596
3597    @inlineCallbacks
3598    def test_POST_mkdir_no_parentdir_redirect2(self):
3599        body, headers = self.build_form(t="mkdir", redirect_to_result="true")
3600        target = yield self.shouldRedirectTo(self.webish_url + "/uri", None,
3601                                             method="post",
3602                                             data=body, headers=headers,
3603                                             code=http.SEE_OTHER)
3604        target = urlunquote(str(target, "ascii"))
3605        self.failUnless(target.startswith("uri/URI:DIR2:"), target)
3606
3607    def _make_readonly(self, u):
3608        ro_uri = uri.from_string(u).get_readonly()
3609        if ro_uri is None:
3610            return None
3611        return ro_uri.to_string()
3612
3613    def _create_initial_children(self):
3614        contents, n, filecap1 = self.makefile(12)
3615        md1 = {"metakey1": "metavalue1"}
3616        filecap2 = make_mutable_file_uri()
3617        node3 = self.s.create_node_from_uri(make_mutable_file_uri())
3618        filecap3 = node3.get_readonly_uri()
3619        node4 = self.s.create_node_from_uri(make_mutable_file_uri())
3620        dircap = DirectoryNode(node4, None, None).get_uri()
3621        mdmfcap = make_mutable_file_uri(mdmf=True)
3622        litdircap = "URI:DIR2-LIT:ge3dumj2mewdcotyfqydulbshj5x2lbm"
3623        emptydircap = "URI:DIR2-LIT:"
3624        newkids = {u"child-imm":        ["filenode", {"rw_uri": filecap1,
3625                                                      "ro_uri": self._make_readonly(filecap1),
3626                                                      "metadata": md1, }],
3627                   u"child-mutable":    ["filenode", {"rw_uri": filecap2,
3628                                                      "ro_uri": self._make_readonly(filecap2)}],
3629                   u"child-mutable-ro": ["filenode", {"ro_uri": filecap3}],
3630                   u"unknownchild-rw":  ["unknown",  {"rw_uri": unknown_rwcap,
3631                                                      "ro_uri": unknown_rocap}],
3632                   u"unknownchild-ro":  ["unknown",  {"ro_uri": unknown_rocap}],
3633                   u"unknownchild-imm": ["unknown",  {"ro_uri": unknown_immcap}],
3634                   u"dirchild":         ["dirnode",  {"rw_uri": dircap,
3635                                                      "ro_uri": self._make_readonly(dircap)}],
3636                   u"dirchild-lit":     ["dirnode",  {"ro_uri": litdircap}],
3637                   u"dirchild-empty":   ["dirnode",  {"ro_uri": emptydircap}],
3638                   u"child-mutable-mdmf": ["filenode", {"rw_uri": mdmfcap,
3639                                                        "ro_uri": self._make_readonly(mdmfcap)}],
3640                   }
3641        return newkids, {'filecap1': filecap1,
3642                         'filecap2': filecap2,
3643                         'filecap3': filecap3,
3644                         'unknown_rwcap': unknown_rwcap,
3645                         'unknown_rocap': unknown_rocap,
3646                         'unknown_immcap': unknown_immcap,
3647                         'dircap': dircap,
3648                         'litdircap': litdircap,
3649                         'emptydircap': emptydircap,
3650                         'mdmfcap': mdmfcap}
3651
3652    def _create_immutable_children(self):
3653        contents, n, filecap1 = self.makefile(12)
3654        md1 = {"metakey1": "metavalue1"}
3655        tnode = create_chk_filenode("immutable directory contents\n"*10,
3656                                    self.get_all_contents())
3657        dnode = DirectoryNode(tnode, None, None)
3658        assert not dnode.is_mutable()
3659        immdircap = dnode.get_uri()
3660        litdircap = "URI:DIR2-LIT:ge3dumj2mewdcotyfqydulbshj5x2lbm"
3661        emptydircap = "URI:DIR2-LIT:"
3662        newkids = {u"child-imm":        ["filenode", {"ro_uri": filecap1,
3663                                                      "metadata": md1, }],
3664                   u"unknownchild-imm": ["unknown",  {"ro_uri": unknown_immcap}],
3665                   u"dirchild-imm":     ["dirnode",  {"ro_uri": immdircap}],
3666                   u"dirchild-lit":     ["dirnode",  {"ro_uri": litdircap}],
3667                   u"dirchild-empty":   ["dirnode",  {"ro_uri": emptydircap}],
3668                   }
3669        return newkids, {'filecap1': filecap1,
3670                         'unknown_immcap': unknown_immcap,
3671                         'immdircap': immdircap,
3672                         'litdircap': litdircap,
3673                         'emptydircap': emptydircap}
3674
3675    def test_POST_mkdir_no_parentdir_initial_children(self):
3676        (newkids, caps) = self._create_initial_children()
3677        d = self.POST2("/uri?t=mkdir-with-children", json.dumps(newkids))
3678        def _after_mkdir(res):
3679            self.failUnless(res.startswith(b"URI:DIR"), res)
3680            n = self.s.create_node_from_uri(res)
3681            d2 = self.failUnlessNodeKeysAre(n, list(newkids.keys()))
3682            d2.addCallback(lambda ign:
3683                           self.failUnlessROChildURIIs(n, u"child-imm",
3684                                                       caps['filecap1']))
3685            d2.addCallback(lambda ign:
3686                           self.failUnlessRWChildURIIs(n, u"child-mutable",
3687                                                       caps['filecap2']))
3688            d2.addCallback(lambda ign:
3689                           self.failUnlessROChildURIIs(n, u"child-mutable-ro",
3690                                                       caps['filecap3']))
3691            d2.addCallback(lambda ign:
3692                           self.failUnlessRWChildURIIs(n, u"unknownchild-rw",
3693                                                       caps['unknown_rwcap']))
3694            d2.addCallback(lambda ign:
3695                           self.failUnlessROChildURIIs(n, u"unknownchild-ro",
3696                                                       caps['unknown_rocap']))
3697            d2.addCallback(lambda ign:
3698                           self.failUnlessROChildURIIs(n, u"unknownchild-imm",
3699                                                       caps['unknown_immcap']))
3700            d2.addCallback(lambda ign:
3701                           self.failUnlessRWChildURIIs(n, u"dirchild",
3702                                                       caps['dircap']))
3703            return d2
3704        d.addCallback(_after_mkdir)
3705        return d
3706
3707    @inlineCallbacks
3708    def test_POST_mkdir_no_parentdir_unexpected_children(self):
3709        # the regular /uri?t=mkdir operation is specified to ignore its body.
3710        # Only t=mkdir-with-children pays attention to it.
3711        (newkids, caps) = self._create_initial_children()
3712        url = self.webish_url + "/uri?t=mkdir" # without children
3713        yield self.assertHTTPError(url, 400,
3714                                   "t=mkdir does not accept children=, "
3715                                   "try t=mkdir-with-children instead",
3716                                   method="post", data=json.dumps(newkids).encode("utf-8"))
3717
3718    @inlineCallbacks
3719    def test_POST_noparent_bad(self):
3720        url = self.webish_url + "/uri?t=bogus"
3721        yield self.assertHTTPError(url, 400,
3722                                   "/uri accepts only PUT, PUT?t=mkdir, "
3723                                   "POST?t=upload, and POST?t=mkdir",
3724                                   method="post")
3725
3726    def test_POST_mkdir_no_parentdir_immutable(self):
3727        (newkids, caps) = self._create_immutable_children()
3728        d = self.POST2("/uri?t=mkdir-immutable", json.dumps(newkids))
3729        def _after_mkdir(res):
3730            self.failUnless(res.startswith(b"URI:DIR"), res)
3731            n = self.s.create_node_from_uri(res)
3732            d2 = self.failUnlessNodeKeysAre(n, list(newkids.keys()))
3733            d2.addCallback(lambda ign:
3734                           self.failUnlessROChildURIIs(n, u"child-imm",
3735                                                          caps['filecap1']))
3736            d2.addCallback(lambda ign:
3737                           self.failUnlessROChildURIIs(n, u"unknownchild-imm",
3738                                                          caps['unknown_immcap']))
3739            d2.addCallback(lambda ign:
3740                           self.failUnlessROChildURIIs(n, u"dirchild-imm",
3741                                                          caps['immdircap']))
3742            d2.addCallback(lambda ign:
3743                           self.failUnlessROChildURIIs(n, u"dirchild-lit",
3744                                                          caps['litdircap']))
3745            d2.addCallback(lambda ign:
3746                           self.failUnlessROChildURIIs(n, u"dirchild-empty",
3747                                                          caps['emptydircap']))
3748            return d2
3749        d.addCallback(_after_mkdir)
3750        return d
3751
3752    def test_POST_mkdir_no_parentdir_immutable_bad(self):
3753        (newkids, caps) = self._create_initial_children()
3754        d = self.shouldFail2(error.Error,
3755                             "test_POST_mkdir_no_parentdir_immutable_bad",
3756                             "400 Bad Request",
3757                             "needed to be immutable but was not",
3758                             self.POST2,
3759                             "/uri?t=mkdir-immutable",
3760                             json.dumps(newkids))
3761        return d
3762
3763    @inlineCallbacks
3764    def test_welcome_page_mkdir_button(self):
3765        # Fetch the welcome page.
3766        res = yield self.GET("/")
3767        res = str(res, "utf-8")
3768        MKDIR_BUTTON_RE = re.compile(
3769            '<form(?: action="([^"]*)"| method="post"| enctype="multipart/form-data"){3}>.*'
3770            '<input (?:type="hidden" |name="t" |value="([^"]*?)" ){3}/>[ ]*'
3771            '<input (?:type="hidden" |name="([^"]*)" |value="([^"]*)" ){3}/>[ ]*'
3772            '<input (type="submit" |class="btn" |value="Create a directory[^"]*" ){3}/>')
3773        html = res.replace('\n', ' ')
3774        mo = MKDIR_BUTTON_RE.search(html)
3775        self.failUnless(mo, html)
3776        formaction = mo.group(1)
3777        formt = mo.group(2)
3778        formaname = mo.group(3)
3779        formavalue = mo.group(4)
3780
3781        url = self.webish_url + "/%s?t=%s&%s=%s" % (formaction, formt,
3782                                                    formaname, formavalue)
3783        target = yield self.shouldRedirectTo(url, None,
3784                                             method="post",
3785                                             code=http.SEE_OTHER)
3786        target = urlunquote(str(target, "utf-8"))
3787        self.failUnless(target.startswith("uri/URI:DIR2:"), target)
3788
3789    def test_POST_mkdir_replace(self): # return value?
3790        d = self.POST(self.public_url + "/foo", t="mkdir", name="sub")
3791        d.addCallback(lambda res: self._foo_node.get(u"sub"))
3792        d.addCallback(self.failUnlessNodeKeysAre, [])
3793        return d
3794
3795    def test_POST_mkdir_no_replace_queryarg(self): # return value?
3796        d = self.POST(self.public_url + "/foo?replace=false", t="mkdir", name="sub")
3797        d.addBoth(self.shouldFail, error.Error,
3798                  "POST_mkdir_no_replace_queryarg",
3799                  "409 Conflict",
3800                  "There was already a child by that name, and you asked me "
3801                  "to not replace it")
3802        d.addCallback(lambda res: self._foo_node.get(u"sub"))
3803        d.addCallback(self.failUnlessNodeKeysAre, [u"baz.txt"])
3804        return d
3805
3806    def test_POST_mkdir_no_replace_field(self): # return value?
3807        d = self.POST(self.public_url + "/foo", t="mkdir", name="sub",
3808                      replace="false")
3809        d.addBoth(self.shouldFail, error.Error, "POST_mkdir_no_replace_field",
3810                  "409 Conflict",
3811                  "There was already a child by that name, and you asked me "
3812                  "to not replace it")
3813        d.addCallback(lambda res: self._foo_node.get(u"sub"))
3814        d.addCallback(self.failUnlessNodeKeysAre, [u"baz.txt"])
3815        return d
3816
3817    @inlineCallbacks
3818    def test_POST_mkdir_whendone_field(self):
3819        body, headers = self.build_form(t="mkdir", name="newdir",
3820                                        when_done="/THERE")
3821        yield self.shouldRedirectTo(self.webish_url + self.public_url + "/foo",
3822                                    "/THERE",
3823                                    method="post", data=body, headers=headers,
3824                                    code=http.FOUND)
3825        res = yield self._foo_node.get(u"newdir")
3826        self.failUnlessNodeKeysAre(res, [])
3827
3828    @inlineCallbacks
3829    def test_POST_mkdir_whendone_queryarg(self):
3830        body, headers = self.build_form(t="mkdir", name="newdir")
3831        url = self.webish_url + self.public_url + "/foo?when_done=/THERE"
3832        yield self.shouldRedirectTo(url, "/THERE",
3833                                    method="post", data=body, headers=headers,
3834                                    code=http.FOUND)
3835        res = yield self._foo_node.get(u"newdir")
3836        self.failUnlessNodeKeysAre(res, [])
3837
3838    def test_POST_bad_t(self):
3839        d = self.shouldFail2(error.Error, "POST_bad_t",
3840                             "400 Bad Request",
3841                             "POST to a directory with bad t=BOGUS",
3842                             self.POST, self.public_url + "/foo", t="BOGUS")
3843        return d
3844
3845    def test_POST_set_children(self, command_name="set_children"):
3846        contents9, n9, newuri9 = self.makefile(9)
3847        contents10, n10, newuri10 = self.makefile(10)
3848        contents11, n11, newuri11 = self.makefile(11)
3849
3850        reqbody = b"""{
3851                     "atomic_added_1": [ "filenode", { "rw_uri": "%s",
3852                                                "size": 0,
3853                                                "metadata": {
3854                                                  "ctime": 1002777696.7564139,
3855                                                  "mtime": 1002777696.7564139
3856                                                 }
3857                                               } ],
3858                     "atomic_added_2": [ "filenode", { "rw_uri": "%s",
3859                                                "size": 1,
3860                                                "metadata": {
3861                                                  "ctime": 1002777696.7564139,
3862                                                  "mtime": 1002777696.7564139
3863                                                 }
3864                                               } ],
3865                     "atomic_added_3": [ "filenode", { "rw_uri": "%s",
3866                                                "size": 2,
3867                                                "metadata": {
3868                                                  "ctime": 1002777696.7564139,
3869                                                  "mtime": 1002777696.7564139
3870                                                 }
3871                                               } ]
3872                    }""" % (newuri9, newuri10, newuri11)
3873
3874        url = self.webish_url + self.public_url + "/foo" + "?t=" + command_name
3875
3876        d = do_http("post", url, data=reqbody)
3877        def _then(res):
3878            self.failUnlessURIMatchesROChild(newuri9, self._foo_node, u"atomic_added_1")
3879            self.failUnlessURIMatchesROChild(newuri10, self._foo_node, u"atomic_added_2")
3880            self.failUnlessURIMatchesROChild(newuri11, self._foo_node, u"atomic_added_3")
3881
3882        d.addCallback(_then)
3883        d.addErrback(self.dump_error)
3884        return d
3885
3886    def test_POST_set_children_with_hyphen(self):
3887        return self.test_POST_set_children(command_name="set-children")
3888
3889    def test_POST_link_uri(self):
3890        contents, n, newuri = self.makefile(8)
3891        d = self.POST(self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
3892        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"new.txt")
3893        d.addCallback(lambda res:
3894                      self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
3895                                                      contents))
3896        return d
3897
3898    def test_POST_link_uri_replace(self):
3899        contents, n, newuri = self.makefile(8)
3900        d = self.POST(self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
3901        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"bar.txt")
3902        d.addCallback(lambda res:
3903                      self.failUnlessChildContentsAre(self._foo_node, u"bar.txt",
3904                                                      contents))
3905        return d
3906
3907    def test_POST_link_uri_unknown_bad(self):
3908        d = self.POST(self.public_url + "/foo", t="uri", name="future.txt", uri=unknown_rwcap)
3909        d.addBoth(self.shouldFail, error.Error,
3910                  "POST_link_uri_unknown_bad",
3911                  "400 Bad Request",
3912                  "unknown cap in a write slot")
3913        return d
3914
3915    def test_POST_link_uri_unknown_ro_good(self):
3916        d = self.POST(self.public_url + "/foo", t="uri", name="future-ro.txt", uri=unknown_rocap)
3917        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"future-ro.txt")
3918        return d
3919
3920    def test_POST_link_uri_unknown_imm_good(self):
3921        d = self.POST(self.public_url + "/foo", t="uri", name="future-imm.txt", uri=unknown_immcap)
3922        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"future-imm.txt")
3923        return d
3924
3925    def test_POST_link_uri_no_replace_queryarg(self):
3926        contents, n, newuri = self.makefile(8)
3927        d = self.POST(self.public_url + "/foo?replace=false", t="uri",
3928                      name="bar.txt", uri=newuri)
3929        d.addBoth(self.shouldFail, error.Error,
3930                  "POST_link_uri_no_replace_queryarg",
3931                  "409 Conflict",
3932                  "There was already a child by that name, and you asked me "
3933                  "to not replace it")
3934        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
3935        d.addCallback(self.failUnlessIsBarDotTxt)
3936        return d
3937
3938    def test_POST_link_uri_no_replace_field(self):
3939        contents, n, newuri = self.makefile(8)
3940        d = self.POST(self.public_url + "/foo", t="uri", replace="false",
3941                      name="bar.txt", uri=newuri)
3942        d.addBoth(self.shouldFail, error.Error,
3943                  "POST_link_uri_no_replace_field",
3944                  "409 Conflict",
3945                  "There was already a child by that name, and you asked me "
3946                  "to not replace it")
3947        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
3948        d.addCallback(self.failUnlessIsBarDotTxt)
3949        return d
3950
3951    def test_POST_delete(self, command_name='delete'):
3952        d = self._foo_node.list()
3953        def _check_before(children):
3954            self.failUnlessIn(u"bar.txt", children)
3955        d.addCallback(_check_before)
3956        d.addCallback(lambda res: self.POST(self.public_url + "/foo", t=command_name, name="bar.txt"))
3957        d.addCallback(lambda res: self._foo_node.list())
3958        def _check_after(children):
3959            self.failIfIn(u"bar.txt", children)
3960        d.addCallback(_check_after)
3961        return d
3962
3963    def test_POST_unlink(self):
3964        return self.test_POST_delete(command_name='unlink')
3965
3966    def test_POST_rename_file(self):
3967        d = self.POST(self.public_url + "/foo", t="rename",
3968                      from_name="bar.txt", to_name='wibble.txt')
3969        d.addCallback(lambda res:
3970                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
3971        d.addCallback(lambda res:
3972                      self.failUnlessNodeHasChild(self._foo_node, u"wibble.txt"))
3973        d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt"))
3974        d.addCallback(self.failUnlessIsBarDotTxt)
3975        d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt?t=json"))
3976        d.addCallback(self.failUnlessIsBarJSON)
3977        return d
3978
3979    def test_POST_rename_file_redundant(self):
3980        d = self.POST(self.public_url + "/foo", t="rename",
3981                      from_name="bar.txt", to_name='bar.txt')
3982        d.addCallback(lambda res:
3983                      self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
3984        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
3985        d.addCallback(self.failUnlessIsBarDotTxt)
3986        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
3987        d.addCallback(self.failUnlessIsBarJSON)
3988        return d
3989
3990    def test_POST_rename_file_replace(self):
3991        # rename a file and replace a directory with it
3992        d = self.POST(self.public_url + "/foo", t="rename",
3993                      from_name="bar.txt", to_name='empty')
3994        d.addCallback(lambda res:
3995                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
3996        d.addCallback(lambda res:
3997                      self.failUnlessNodeHasChild(self._foo_node, u"empty"))
3998        d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty"))
3999        d.addCallback(self.failUnlessIsBarDotTxt)
4000        d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
4001        d.addCallback(self.failUnlessIsBarJSON)
4002        return d
4003
4004    def test_POST_rename_file_no_replace_queryarg(self):
4005        # rename a file and replace a directory with it
4006        d = self.POST(self.public_url + "/foo?replace=false", t="rename",
4007                      from_name="bar.txt", to_name='empty')
4008        d.addBoth(self.shouldFail, error.Error,
4009                  "POST_rename_file_no_replace_queryarg",
4010                  "409 Conflict",
4011                  "There was already a child by that name, and you asked me "
4012                  "to not replace it")
4013        d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
4014        d.addCallback(self.failUnlessIsEmptyJSON)
4015        return d
4016
4017    def test_POST_rename_file_no_replace_field(self):
4018        # rename a file and replace a directory with it
4019        d = self.POST(self.public_url + "/foo", t="rename", replace="false",
4020                      from_name="bar.txt", to_name='empty')
4021        d.addBoth(self.shouldFail, error.Error,
4022                  "POST_rename_file_no_replace_field",
4023                  "409 Conflict",
4024                  "There was already a child by that name, and you asked me "
4025                  "to not replace it")
4026        d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
4027        d.addCallback(self.failUnlessIsEmptyJSON)
4028        return d
4029
4030    def test_POST_rename_file_no_replace_same_link(self):
4031        d = self.POST(self.public_url + "/foo", t="rename",
4032                      replace="false", from_name="bar.txt", to_name="bar.txt")
4033        d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4034        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4035        d.addCallback(self.failUnlessIsBarDotTxt)
4036        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4037        d.addCallback(self.failUnlessIsBarJSON)
4038        return d
4039
4040    def test_POST_rename_file_replace_only_files(self):
4041        d = self.POST(self.public_url + "/foo", t="rename",
4042                      replace="only-files", from_name="bar.txt",
4043                      to_name="baz.txt")
4044        d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4045        d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
4046        d.addCallback(self.failUnlessIsBarDotTxt)
4047        d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt?t=json"))
4048        d.addCallback(self.failUnlessIsBarJSON)
4049        return d
4050
4051    def test_POST_rename_file_replace_only_files_conflict(self):
4052        d = self.shouldFail2(error.Error, "POST_relink_file_replace_only_files_conflict",
4053                             "409 Conflict",
4054                             "There was already a child by that name, and you asked me to not replace it.",
4055                             self.POST, self.public_url + "/foo", t="relink",
4056                             replace="only-files", from_name="bar.txt",
4057                             to_name="empty")
4058        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4059        d.addCallback(self.failUnlessIsBarDotTxt)
4060        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4061        d.addCallback(self.failUnlessIsBarJSON)
4062        return d
4063
4064    def failUnlessIsEmptyJSON(self, res):
4065        data = json.loads(res)
4066        self.failUnlessEqual(data[0], "dirnode", data)
4067        self.failUnlessReallyEqual(len(data[1]["children"]), 0)
4068
4069    def test_POST_rename_file_to_slash_fail(self):
4070        d = self.POST(self.public_url + "/foo", t="rename",
4071                      from_name="bar.txt", to_name='kirk/spock.txt')
4072        d.addBoth(self.shouldFail, error.Error,
4073                  "test_POST_rename_file_to_slash_fail",
4074                  "400 Bad Request",
4075                  "to_name= may not contain a slash",
4076                  )
4077        d.addCallback(lambda res:
4078                      self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4079        return d
4080
4081    def test_POST_rename_file_from_slash_fail(self):
4082        d = self.POST(self.public_url + "/foo", t="rename",
4083                      from_name="sub/bar.txt", to_name='spock.txt')
4084        d.addBoth(self.shouldFail, error.Error,
4085                  "test_POST_rename_from_file_slash_fail",
4086                  "400 Bad Request",
4087                  "from_name= may not contain a slash",
4088                  )
4089        d.addCallback(lambda res:
4090                      self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4091        return d
4092
4093    def test_POST_rename_dir(self):
4094        d = self.POST(self.public_url, t="rename",
4095                      from_name="foo", to_name='plunk')
4096        d.addCallback(lambda res:
4097                      self.failIfNodeHasChild(self.public_root, u"foo"))
4098        d.addCallback(lambda res:
4099                      self.failUnlessNodeHasChild(self.public_root, u"plunk"))
4100        d.addCallback(lambda res: self.GET(self.public_url + "/plunk?t=json"))
4101        d.addCallback(self.failUnlessIsFooJSON)
4102        return d
4103
4104    def test_POST_relink_file(self):
4105        d = self.POST(self.public_url + "/foo", t="relink",
4106                      from_name="bar.txt",
4107                      to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4108        d.addCallback(lambda res:
4109                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4110        d.addCallback(lambda res:
4111                      self.failUnlessNodeHasChild(self._sub_node, u"bar.txt"))
4112        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt"))
4113        d.addCallback(self.failUnlessIsBarDotTxt)
4114        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt?t=json"))
4115        d.addCallback(self.failUnlessIsBarJSON)
4116        return d
4117
4118    def test_POST_relink_file_new_name(self):
4119        d = self.POST(self.public_url + "/foo", t="relink",
4120                      from_name="bar.txt",
4121                      to_name="wibble.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4122        d.addCallback(lambda res:
4123                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4124        d.addCallback(lambda res:
4125                      self.failIfNodeHasChild(self._sub_node, u"bar.txt"))
4126        d.addCallback(lambda res:
4127                      self.failUnlessNodeHasChild(self._sub_node, u"wibble.txt"))
4128        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/wibble.txt"))
4129        d.addCallback(self.failUnlessIsBarDotTxt)
4130        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/wibble.txt?t=json"))
4131        d.addCallback(self.failUnlessIsBarJSON)
4132        return d
4133
4134    def test_POST_relink_file_replace(self):
4135        d = self.POST(self.public_url + "/foo", t="relink",
4136                      from_name="bar.txt",
4137                      to_name="baz.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4138        d.addCallback(lambda res:
4139                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4140        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
4141        d.addCallback(self.failUnlessIsBarDotTxt)
4142        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt?t=json"))
4143        d.addCallback(self.failUnlessIsBarJSON)
4144        return d
4145
4146    def test_POST_relink_file_no_replace(self):
4147        d = self.shouldFail2(error.Error, "POST_relink_file_no_replace",
4148                             "409 Conflict",
4149                             "There was already a child by that name, and you asked me to not replace it",
4150                             self.POST, self.public_url + "/foo", t="relink",
4151                             replace="false", from_name="bar.txt",
4152                             to_name="baz.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4153        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4154        d.addCallback(self.failUnlessIsBarDotTxt)
4155        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4156        d.addCallback(self.failUnlessIsBarJSON)
4157        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
4158        d.addCallback(self.failUnlessIsSubBazDotTxt)
4159        return d
4160
4161    def test_POST_relink_file_no_replace_explicitly_same_link(self):
4162        d = self.POST(self.public_url + "/foo", t="relink",
4163                      replace="false", from_name="bar.txt",
4164                      to_name="bar.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo")
4165        d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4166        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4167        d.addCallback(self.failUnlessIsBarDotTxt)
4168        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4169        d.addCallback(self.failUnlessIsBarJSON)
4170        return d
4171
4172    def test_POST_relink_file_replace_only_files(self):
4173        d = self.POST(self.public_url + "/foo", t="relink",
4174                      replace="only-files", from_name="bar.txt",
4175                      to_name="baz.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4176        d.addCallback(lambda res:
4177                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4178        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
4179        d.addCallback(self.failUnlessIsBarDotTxt)
4180        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt?t=json"))
4181        d.addCallback(self.failUnlessIsBarJSON)
4182        return d
4183
4184    def test_POST_relink_file_replace_only_files_conflict(self):
4185        d = self.shouldFail2(error.Error, "POST_relink_file_replace_only_files_conflict",
4186                             "409 Conflict",
4187                             "There was already a child by that name, and you asked me to not replace it.",
4188                             self.POST, self.public_url + "/foo", t="relink",
4189                             replace="only-files", from_name="bar.txt",
4190                             to_name="sub", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo")
4191        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4192        d.addCallback(self.failUnlessIsBarDotTxt)
4193        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4194        d.addCallback(self.failUnlessIsBarJSON)
4195        return d
4196
4197    def test_POST_relink_file_to_slash_fail(self):
4198        d = self.shouldFail2(error.Error, "test_POST_rename_file_slash_fail",
4199                             "400 Bad Request",
4200                             "to_name= may not contain a slash",
4201                             self.POST, self.public_url + "/foo", t="relink",
4202                             from_name="bar.txt",
4203                             to_name="slash/fail.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4204        d.addCallback(lambda res:
4205                      self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4206        d.addCallback(lambda res:
4207                      self.failIfNodeHasChild(self._sub_node, u"slash/fail.txt"))
4208        d.addCallback(lambda ign:
4209                      self.shouldFail2(error.Error,
4210                                       "test_POST_rename_file_slash_fail2",
4211                                       "400 Bad Request",
4212                                       "from_name= may not contain a slash",
4213                                       self.POST, self.public_url + "/foo",
4214                                       t="relink",
4215                                       from_name="nope/bar.txt",
4216                                       to_name="fail.txt",
4217                                       to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub"))
4218        return d
4219
4220    def test_POST_relink_file_explicitly_same_link(self):
4221        d = self.POST(self.public_url + "/foo", t="relink",
4222                      from_name="bar.txt",
4223                      to_name="bar.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo")
4224        d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4225        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4226        d.addCallback(self.failUnlessIsBarDotTxt)
4227        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4228        d.addCallback(self.failUnlessIsBarJSON)
4229        return d
4230
4231    def test_POST_relink_file_implicitly_same_link(self):
4232        d = self.POST(self.public_url + "/foo", t="relink",
4233                      from_name="bar.txt")
4234        d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
4235        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4236        d.addCallback(self.failUnlessIsBarDotTxt)
4237        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4238        d.addCallback(self.failUnlessIsBarJSON)
4239        return d
4240
4241    def test_POST_relink_file_same_dir(self):
4242        d = self.POST(self.public_url + "/foo", t="relink",
4243                      from_name="bar.txt",
4244                      to_name="baz.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo")
4245        d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4246        d.addCallback(lambda res: self.failUnlessNodeHasChild(self._sub_node, u"baz.txt"))
4247        d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
4248        d.addCallback(self.failUnlessIsBarDotTxt)
4249        d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt?t=json"))
4250        d.addCallback(self.failUnlessIsBarJSON)
4251        return d
4252
4253    def test_POST_relink_file_bad_replace(self):
4254        d = self.shouldFail2(error.Error, "test_POST_relink_file_bad_replace",
4255                             "400 Bad Request", "invalid replace= argument: 'boogabooga'",
4256                             self.POST,
4257                             self.public_url + "/foo", t="relink",
4258                             replace="boogabooga", from_name="bar.txt",
4259                             to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub")
4260        return d
4261
4262    def test_POST_relink_file_multi_level(self):
4263        d = self.POST2(self.public_url + "/foo/sub/level2?t=mkdir", "")
4264        d.addCallback(lambda res: self.POST(self.public_url + "/foo", t="relink",
4265                      from_name="bar.txt", to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/sub/level2"))
4266        d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4267        d.addCallback(lambda res: self.failIfNodeHasChild(self._sub_node, u"bar.txt"))
4268        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/level2/bar.txt"))
4269        d.addCallback(self.failUnlessIsBarDotTxt)
4270        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/level2/bar.txt?t=json"))
4271        d.addCallback(self.failUnlessIsBarJSON)
4272        return d
4273
4274    def test_POST_relink_file_to_uri(self):
4275        d = self.POST(self.public_url + "/foo", t="relink", target_type="uri",
4276                      from_name="bar.txt", to_dir=self._sub_uri)
4277        d.addCallback(lambda res:
4278                      self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
4279        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt"))
4280        d.addCallback(self.failUnlessIsBarDotTxt)
4281        d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt?t=json"))
4282        d.addCallback(self.failUnlessIsBarJSON)
4283        return d
4284
4285    def test_POST_relink_file_to_nonexistent_dir(self):
4286        d = self.shouldFail2(error.Error, "POST_relink_file_to_nonexistent_dir",
4287                            "404 Not Found", "No such child: nopechucktesta",
4288                            self.POST, self.public_url + "/foo", t="relink",
4289                            from_name="bar.txt",
4290                            to_dir=str(self.public_root.get_uri(), "utf-8") + "/nopechucktesta")
4291        return d
4292
4293    def test_POST_relink_file_into_file(self):
4294        d = self.shouldFail2(error.Error, "POST_relink_file_into_file",
4295                             "400 Bad Request", "to_dir is not a directory",
4296                             self.POST, self.public_url + "/foo", t="relink",
4297                             from_name="bar.txt",
4298                             to_dir=str(self.public_root.get_uri(), "utf-8") + "/foo/baz.txt")
4299        d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
4300        d.addCallback(self.failUnlessIsBazDotTxt)
4301        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4302        d.addCallback(self.failUnlessIsBarDotTxt)
4303        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4304        d.addCallback(self.failUnlessIsBarJSON)
4305        return d
4306
4307    def test_POST_relink_file_to_bad_uri(self):
4308        d =  self.shouldFail2(error.Error, "POST_relink_file_to_bad_uri",
4309                              "400 Bad Request", "to_dir is not a directory",
4310                              self.POST, self.public_url + "/foo", t="relink",
4311                              from_name="bar.txt",
4312                              to_dir="URI:DIR2:mn5jlyjnrjeuydyswlzyui72i:rmneifcj6k6sycjljjhj3f6majsq2zqffydnnul5hfa4j577arma")
4313        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
4314        d.addCallback(self.failUnlessIsBarDotTxt)
4315        d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
4316        d.addCallback(self.failUnlessIsBarJSON)
4317        return d
4318
4319    def test_POST_relink_dir(self):
4320        d = self.POST(self.public_url + "/foo", t="relink",
4321                      from_name="bar.txt",
4322                      to_dir=str(self.public_root.get_uri(), "ascii") + "/foo/empty")
4323        d.addCallback(lambda res: self.POST(self.public_url + "/foo",
4324                      t="relink", from_name="empty",
4325                      to_dir=str(self.public_root.get_uri(), "ascii") + "/foo/sub"))
4326        d.addCallback(lambda res:
4327                      self.failIfNodeHasChild(self._foo_node, u"empty"))
4328        d.addCallback(lambda res:
4329                      self.failUnlessNodeHasChild(self._sub_node, u"empty"))
4330        d.addCallback(lambda res:
4331                      self._sub_node.get_child_at_path(u"empty"))
4332        d.addCallback(lambda node:
4333                      self.failUnlessNodeHasChild(node, u"bar.txt"))
4334        d.addCallback(lambda res:
4335                      self.GET(self.public_url + "/foo/sub/empty/bar.txt"))
4336        d.addCallback(self.failUnlessIsBarDotTxt)
4337        return d
4338
4339    @inlineCallbacks
4340    def shouldRedirectTo(self, url, target_location, method="get",
4341                         code=None, **args):
4342        response = yield treq.request(method, url, persistent=False,
4343                                      allow_redirects=False, **args)
4344        codes = [http.MOVED_PERMANENTLY,
4345                 http.FOUND,
4346                 http.TEMPORARY_REDIRECT,
4347                 ] if code is None else [code]
4348        yield response.content()
4349        self.assertIn(response.code, codes)
4350        location = response.headers.getRawHeaders(b"location")[0]
4351        if target_location is not None:
4352            self.assertEquals(str(location, "ascii"), target_location)
4353        returnValue(location)
4354
4355    @inlineCallbacks
4356    def test_GET_URI_form(self):
4357        relbase = "/uri?uri=%s" % str(self._bar_txt_uri, "ascii")
4358        base = self.webish_url + relbase
4359        # this is supposed to give us a redirect to /uri/$URI, plus arguments
4360        targetbase = self.webish_url + "/uri/%s" % urlquote(self._bar_txt_uri)
4361        yield self.shouldRedirectTo(base, targetbase)
4362        yield self.shouldRedirectTo(base+"&filename=bar.txt",
4363                                    targetbase+"?filename=bar.txt")
4364        yield self.shouldRedirectTo(base+"&t=json",
4365                                    targetbase+"?t=json")
4366
4367        self.log(None, "about to get file by uri")
4368        data = yield self.GET(relbase, followRedirect=True)
4369        self.failUnlessIsBarDotTxt(data)
4370        self.log(None, "got file by uri, about to get dir by uri")
4371        data = yield self.GET("/uri?uri=%s&t=json" % str(self._foo_uri, "ascii"),
4372                              followRedirect=True)
4373        self.failUnlessIsFooJSON(data)
4374        self.log(None, "got dir by uri")
4375
4376    def test_GET_URI_form_bad(self):
4377        d = self.shouldFail2(error.Error, "test_GET_URI_form_bad",
4378                             "400 Bad Request", "GET /uri requires uri=",
4379                             self.GET, "/uri")
4380        return d
4381
4382    @inlineCallbacks
4383    def test_GET_rename_form(self):
4384        data = yield self.GET(
4385            self.public_url + "/foo?t=rename-form&name=bar.txt",
4386            followRedirect=True
4387        )
4388        soup = BeautifulSoup(data, 'html5lib')
4389        assert_soup_has_favicon(self, soup)
4390        assert_soup_has_tag_with_attributes(
4391            self, soup, u"input",
4392            {u"name": u"when_done", u"value": u".", u"type": u"hidden"},
4393        )
4394        assert_soup_has_tag_with_attributes(
4395            self, soup, u"input",
4396            {u"readonly": u"true", u"name": u"from_name", u"value": u"bar.txt", u"type": u"text"},
4397        )
4398
4399    def log(self, res, msg):
4400        #print("MSG: %s  RES: %s" % (msg, res))
4401        log.msg(msg)
4402        return res
4403
4404    def test_GET_URI_URL(self):
4405        base = "/uri/%s" % str(self._bar_txt_uri, "ascii")
4406        d = self.GET(base)
4407        d.addCallback(self.failUnlessIsBarDotTxt)
4408        d.addCallback(lambda res: self.GET(base+"?filename=bar.txt"))
4409        d.addCallback(self.failUnlessIsBarDotTxt)
4410        d.addCallback(lambda res: self.GET(base+"?filename=bar.txt&save=true"))
4411        d.addCallback(self.failUnlessIsBarDotTxt)
4412        return d
4413
4414    def test_GET_URI_URL_dir(self):
4415        base = "/uri/%s?t=json" % str(self._foo_uri, "ascii")
4416        d = self.GET(base)
4417        d.addCallback(self.failUnlessIsFooJSON)
4418        return d
4419
4420    @inlineCallbacks
4421    def test_GET_URI_URL_missing(self):
4422        base = "/uri/%s" % str(self._bad_file_uri, "ascii")
4423        url = self.webish_url + base
4424        yield self.assertHTTPError(url, http.GONE, "NotEnoughSharesError")
4425        # TODO: how can we exercise both sides of WebDownloadTarget.fail
4426        # here? we must arrange for a download to fail after target.open()
4427        # has been called, and then inspect the response to see that it is
4428        # shorter than we expected.
4429
4430    def test_PUT_DIRURL_uri(self):
4431        d = self.s.create_dirnode()
4432        def _made_dir(dn):
4433            new_uri = dn.get_uri()
4434            # replace /foo with a new (empty) directory
4435            d = self.PUT(self.public_url + "/foo?t=uri", new_uri)
4436            d.addCallback(lambda res:
4437                          self.failUnlessReallyEqual(res.strip(), new_uri))
4438            d.addCallback(lambda res:
4439                          self.failUnlessRWChildURIIs(self.public_root,
4440                                                      u"foo",
4441                                                      new_uri))
4442            return d
4443        d.addCallback(_made_dir)
4444        return d
4445
4446    def test_PUT_DIRURL_uri_noreplace(self):
4447        d = self.s.create_dirnode()
4448        def _made_dir(dn):
4449            new_uri = dn.get_uri()
4450            # replace /foo with a new (empty) directory, but ask that
4451            # replace=false, so it should fail
4452            d = self.shouldFail2(error.Error, "test_PUT_DIRURL_uri_noreplace",
4453                                 "409 Conflict", "There was already a child by that name, and you asked me to not replace it",
4454                                 self.PUT,
4455                                 self.public_url + "/foo?t=uri&replace=false",
4456                                 new_uri)
4457            d.addCallback(lambda res:
4458                          self.failUnlessRWChildURIIs(self.public_root,
4459                                                      u"foo",
4460                                                      self._foo_uri))
4461            return d
4462        d.addCallback(_made_dir)
4463        return d
4464
4465    def test_PUT_DIRURL_bad_t(self):
4466        d = self.shouldFail2(error.Error, "test_PUT_DIRURL_bad_t",
4467                             "400 Bad Request", "PUT to a directory",
4468                             self.PUT, self.public_url + "/foo?t=BOGUS", "")
4469        d.addCallback(lambda res:
4470                      self.failUnlessRWChildURIIs(self.public_root,
4471                                                  u"foo",
4472                                                  self._foo_uri))
4473        return d
4474
4475    def test_PUT_NEWFILEURL_uri(self):
4476        contents, n, new_uri = self.makefile(8)
4477        d = self.PUT(self.public_url + "/foo/new.txt?t=uri", new_uri)
4478        d.addCallback(lambda res: self.failUnlessReallyEqual(res.strip(), new_uri))
4479        d.addCallback(lambda res:
4480                      self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
4481                                                      contents))
4482        return d
4483
4484    def test_PUT_NEWFILEURL_mdmf(self):
4485        new_contents = self.NEWFILE_CONTENTS * 300000
4486        d = self.PUT(self.public_url + \
4487                     "/foo/mdmf.txt?format=mdmf",
4488                     new_contents)
4489        d.addCallback(lambda ignored:
4490            self.GET(self.public_url + "/foo/mdmf.txt?t=json"))
4491        def _got_json(raw):
4492            data = json.loads(raw)
4493            data = data[1]
4494            self.failUnlessIn("format", data)
4495            self.failUnlessEqual(data["format"], "MDMF")
4496            self.failUnless(data['rw_uri'].startswith("URI:MDMF"))
4497            self.failUnless(data['ro_uri'].startswith("URI:MDMF"))
4498        d.addCallback(_got_json)
4499        return d
4500
4501    def test_PUT_NEWFILEURL_sdmf(self):
4502        new_contents = self.NEWFILE_CONTENTS * 300000
4503        d = self.PUT(self.public_url + \
4504                     "/foo/sdmf.txt?format=sdmf",
4505                     new_contents)
4506        d.addCallback(lambda ignored:
4507            self.GET(self.public_url + "/foo/sdmf.txt?t=json"))
4508        def _got_json(raw):
4509            data = json.loads(raw)
4510            data = data[1]
4511            self.failUnlessIn("format", data)
4512            self.failUnlessEqual(data["format"], "SDMF")
4513        d.addCallback(_got_json)
4514        return d
4515
4516    @inlineCallbacks
4517    def test_PUT_NEWFILEURL_bad_format(self):
4518        new_contents = self.NEWFILE_CONTENTS * 300000
4519        url = self.webish_url + self.public_url + "/foo/foo.txt?format=foo"
4520        yield self.assertHTTPError(url, 400, "Unknown format: foo",
4521                                   method="put", data=new_contents)
4522
4523    def test_PUT_NEWFILEURL_uri_replace(self):
4524        contents, n, new_uri = self.makefile(8)
4525        d = self.PUT(self.public_url + "/foo/bar.txt?t=uri", new_uri)
4526        d.addCallback(lambda res: self.failUnlessReallyEqual(res.strip(), new_uri))
4527        d.addCallback(lambda res:
4528                      self.failUnlessChildContentsAre(self._foo_node, u"bar.txt",
4529                                                      contents))
4530        return d
4531
4532    def test_PUT_NEWFILEURL_uri_no_replace(self):
4533        contents, n, new_uri = self.makefile(8)
4534        d = self.PUT(self.public_url + "/foo/bar.txt?t=uri&replace=false", new_uri)
4535        d.addBoth(self.shouldFail, error.Error,
4536                  "PUT_NEWFILEURL_uri_no_replace",
4537                  "409 Conflict",
4538                  "There was already a child by that name, and you asked me "
4539                  "to not replace it")
4540        return d
4541
4542    def test_PUT_NEWFILEURL_uri_unknown_bad(self):
4543        d = self.PUT(self.public_url + "/foo/put-future.txt?t=uri", unknown_rwcap)
4544        d.addBoth(self.shouldFail, error.Error,
4545                  "POST_put_uri_unknown_bad",
4546                  "400 Bad Request",
4547                  "unknown cap in a write slot")
4548        return d
4549
4550    def test_PUT_NEWFILEURL_uri_unknown_ro_good(self):
4551        d = self.PUT(self.public_url + "/foo/put-future-ro.txt?t=uri", unknown_rocap)
4552        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node,
4553                      u"put-future-ro.txt")
4554        return d
4555
4556    def test_PUT_NEWFILEURL_uri_unknown_imm_good(self):
4557        d = self.PUT(self.public_url + "/foo/put-future-imm.txt?t=uri", unknown_immcap)
4558        d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node,
4559                      u"put-future-imm.txt")
4560        return d
4561
4562    def test_PUT_NEWFILE_URI(self):
4563        file_contents = b"New file contents here\n"
4564        d = self.PUT("/uri", file_contents)
4565        def _check(uri):
4566            assert isinstance(uri, bytes), uri
4567            self.failUnlessIn(uri, self.get_all_contents())
4568            self.failUnlessReallyEqual(self.get_all_contents()[uri],
4569                                       file_contents)
4570            return self.GET("/uri/%s" % str(uri, "utf-8"))
4571        d.addCallback(_check)
4572        def _check2(res):
4573            self.failUnlessReallyEqual(res, file_contents)
4574        d.addCallback(_check2)
4575        return d
4576
4577    def test_PUT_NEWFILE_URI_not_mutable(self):
4578        file_contents = b"New file contents here\n"
4579        d = self.PUT("/uri?mutable=false", file_contents)
4580        def _check(uri):
4581            assert isinstance(uri, bytes), uri
4582            self.failUnlessIn(uri, self.get_all_contents())
4583            self.failUnlessReallyEqual(self.get_all_contents()[uri],
4584                                       file_contents)
4585            return self.GET("/uri/%s" % str(uri, "utf-8"))
4586        d.addCallback(_check)
4587        def _check2(res):
4588            self.failUnlessReallyEqual(res, file_contents)
4589        d.addCallback(_check2)
4590        return d
4591
4592    def test_PUT_NEWFILE_URI_only_PUT(self):
4593        d = self.PUT("/uri?t=bogus", b"")
4594        d.addBoth(self.shouldFail, error.Error,
4595                  "PUT_NEWFILE_URI_only_PUT",
4596                  "400 Bad Request",
4597                  "/uri accepts only PUT, PUT?t=mkdir, POST?t=upload, and POST?t=mkdir")
4598        return d
4599
4600    def test_PUT_NEWFILE_URI_mutable(self):
4601        file_contents = b"New file contents here\n"
4602        d = self.PUT("/uri?mutable=true", file_contents)
4603        def _check1(filecap):
4604            filecap = filecap.strip()
4605            self.failUnless(filecap.startswith(b"URI:SSK:"), filecap)
4606            self.filecap = filecap
4607            u = uri.WriteableSSKFileURI.init_from_string(filecap)
4608            self.failUnlessIn(u.get_storage_index(), self.get_all_contents())
4609            n = self.s.create_node_from_uri(filecap)
4610            return n.download_best_version()
4611        d.addCallback(_check1)
4612        def _check2(data):
4613            self.failUnlessReallyEqual(data, file_contents)
4614            return self.GET("/uri/%s" % urlquote(str(self.filecap, "utf-8")))
4615        d.addCallback(_check2)
4616        def _check3(res):
4617            self.failUnlessReallyEqual(res, file_contents)
4618        d.addCallback(_check3)
4619        return d
4620
4621    def test_PUT_mkdir(self):
4622        d = self.PUT("/uri?t=mkdir", "")
4623        def _check(uri):
4624            n = self.s.create_node_from_uri(uri.strip())
4625            d2 = self.failUnlessNodeKeysAre(n, [])
4626            d2.addCallback(lambda res:
4627                           self.GET("/uri/%s?t=json" % str(uri, "utf-8")))
4628            return d2
4629        d.addCallback(_check)
4630        d.addCallback(self.failUnlessIsEmptyJSON)
4631        return d
4632
4633    def test_PUT_mkdir_mdmf(self):
4634        d = self.PUT("/uri?t=mkdir&format=mdmf", "")
4635        def _got(res):
4636            u = uri.from_string(res)
4637            # Check that this is an MDMF writecap
4638            self.failUnlessIsInstance(u, uri.MDMFDirectoryURI)
4639        d.addCallback(_got)
4640        return d
4641
4642    def test_PUT_mkdir_sdmf(self):
4643        d = self.PUT("/uri?t=mkdir&format=sdmf", "")
4644        def _got(res):
4645            u = uri.from_string(res)
4646            self.failUnlessIsInstance(u, uri.DirectoryURI)
4647        d.addCallback(_got)
4648        return d
4649
4650    @inlineCallbacks
4651    def test_PUT_mkdir_bad_format(self):
4652        url = self.webish_url + "/uri?t=mkdir&format=foo"
4653        yield self.assertHTTPError(url, 400, "Unknown format: foo",
4654                                   method="put", data=b"")
4655
4656    def test_POST_check(self):
4657        d = self.POST(self.public_url + "/foo", t="check", name="bar.txt")
4658        def _done(res):
4659            # this returns a string form of the results, which are probably
4660            # None since we're using fake filenodes.
4661            # TODO: verify that the check actually happened, by changing
4662            # FakeCHKFileNode to count how many times .check() has been
4663            # called.
4664            pass
4665        d.addCallback(_done)
4666        return d
4667
4668
4669    def test_PUT_update_at_offset(self):
4670        file_contents = b"test file" * 100000 # about 900 KiB
4671        d = self.PUT("/uri?mutable=true", file_contents)
4672        def _then(filecap):
4673            self.filecap = filecap
4674            new_data = file_contents[:100]
4675            new = b"replaced and so on"
4676            new_data += new
4677            new_data += file_contents[len(new_data):]
4678            assert len(new_data) == len(file_contents)
4679            self.new_data = new_data
4680        d.addCallback(_then)
4681        d.addCallback(lambda ignored:
4682            self.PUT("/uri/%s?replace=True&offset=100" % str(self.filecap, "utf-8"),
4683                     b"replaced and so on"))
4684        def _get_data(filecap):
4685            n = self.s.create_node_from_uri(filecap)
4686            return n.download_best_version()
4687        d.addCallback(_get_data)
4688        d.addCallback(lambda results:
4689            self.failUnlessEqual(results, self.new_data))
4690        # Now try appending things to the file
4691        d.addCallback(lambda ignored:
4692            self.PUT("/uri/%s?offset=%d" % (str(self.filecap, "utf-8"), len(self.new_data)),
4693                     b"puppies" * 100))
4694        d.addCallback(_get_data)
4695        d.addCallback(lambda results:
4696            self.failUnlessEqual(results, self.new_data + (b"puppies" * 100)))
4697        # and try replacing the beginning of the file
4698        d.addCallback(lambda ignored:
4699            self.PUT("/uri/%s?offset=0" % str(self.filecap, "utf-8"), b"begin"))
4700        d.addCallback(_get_data)
4701        d.addCallback(lambda results:
4702            self.failUnlessEqual(results, b"begin"+self.new_data[len(b"begin"):]+(b"puppies"*100)))
4703        return d
4704
4705    @inlineCallbacks
4706    def test_PUT_update_at_invalid_offset(self):
4707        file_contents = b"test file" * 100000 # about 900 KiB
4708        filecap = yield self.PUT("/uri?mutable=true", file_contents)
4709        # Negative offsets should cause an error.
4710        url = self.webish_url + "/uri/%s?offset=-1" % str(filecap, "utf-8")
4711        yield self.assertHTTPError(url, 400, "Invalid offset",
4712                                   method="put", data=b"foo")
4713
4714    @inlineCallbacks
4715    def test_PUT_update_at_offset_immutable(self):
4716        file_contents = b"Test file" * 100000
4717        filecap = yield self.PUT("/uri", file_contents)
4718        url = self.webish_url + "/uri/%s?offset=50" % str(filecap, "utf-8")
4719        yield self.assertHTTPError(url, 400, "immutable",
4720                                   method="put", data=b"foo")
4721
4722    @inlineCallbacks
4723    def test_bad_method(self):
4724        url = self.webish_url + self.public_url + "/foo/bar.txt"
4725        yield self.assertHTTPError(url, 501,
4726                                   "I don't know how to treat a BOGUS request.",
4727                                   method="BOGUS")
4728
4729    @inlineCallbacks
4730    def test_short_url(self):
4731        url = self.webish_url + "/uri"
4732        yield self.assertHTTPError(url, 501,
4733                                   "I don't know how to treat a DELETE request.",
4734                                   method="DELETE")
4735
4736    @inlineCallbacks
4737    def test_ophandle_bad(self):
4738        url = self.webish_url + "/operations/bogus?t=status"
4739        yield self.assertHTTPError(url, 404,
4740                                   "unknown/expired handle 'bogus'")
4741
4742    @inlineCallbacks
4743    def test_ophandle_cancel(self):
4744        url = self.webish_url + self.public_url + "/foo?t=start-manifest&ophandle=128"
4745        yield do_http("post", url,
4746                      allow_redirects=True, browser_like_redirects=True)
4747        res = yield self.GET("/operations/128?t=status&output=JSON")
4748        data = json.loads(res)
4749        self.failUnless("finished" in data, res)
4750        monitor = self.ws.getServiceNamed("operations").handles[b"128"][0]
4751
4752        res = yield self.POST("/operations/128?t=cancel&output=JSON")
4753        data = json.loads(res)
4754        self.failUnless("finished" in data, res)
4755        # t=cancel causes the handle to be forgotten
4756        self.failUnless(monitor.is_cancelled())
4757
4758        url = self.webish_url + "/operations/128?t=status&output=JSON"
4759        yield self.assertHTTPError(url, 404, "unknown/expired handle '128'")
4760
4761    @inlineCallbacks
4762    def test_ophandle_retainfor(self):
4763        url = self.webish_url + self.public_url + "/foo?t=start-manifest&ophandle=129&retain-for=60"
4764        yield do_http("post", url,
4765                      allow_redirects=True, browser_like_redirects=True)
4766        res = yield self.GET("/operations/129?t=status&output=JSON&retain-for=0")
4767        data = json.loads(res)
4768        self.failUnless("finished" in data, res)
4769
4770        # the retain-for=0 will cause the handle to be expired very soon
4771        yield self.clock.advance(2.0)
4772        url = self.webish_url + "/operations/129?t=status&output=JSON"
4773        yield self.assertHTTPError(url, 404, "unknown/expired handle '129'")
4774
4775    @inlineCallbacks
4776    def test_ophandle_release_after_complete(self):
4777        url = self.webish_url + self.public_url + "/foo?t=start-manifest&ophandle=130"
4778        yield do_http("post", url,
4779                      allow_redirects=True, browser_like_redirects=True)
4780        yield self.wait_for_operation(None, "130")
4781        yield self.GET("/operations/130?t=status&output=JSON&release-after-complete=true")
4782        # the release-after-complete=true will cause the handle to be expired
4783        op_url = self.webish_url + "/operations/130?t=status&output=JSON"
4784        yield self.assertHTTPError(op_url, 404, "unknown/expired handle '130'")
4785
4786    @inlineCallbacks
4787    def test_uncollected_ophandle_expiration(self):
4788        # uncollected ophandles should expire after 4 days
4789        def _make_uncollected_ophandle(ophandle):
4790            url = (self.webish_url + self.public_url +
4791                   "/foo?t=start-manifest&ophandle=%d" % ophandle)
4792            # When we start the operation, the webapi server will want to
4793            # redirect us to the page for the ophandle, so we get
4794            # confirmation that the operation has started. If the manifest
4795            # operation has finished by the time we get there, following that
4796            # redirect would have the side effect of collecting the ophandle
4797            # that we've just created, which means that we can't use the
4798            # ophandle to test the uncollected timeout anymore. So, instead,
4799            # catch+ignore any 302 here and don't follow it.
4800            d = treq.request("post", url, persistent=False)
4801            def _ignore_redirect(f):
4802                f.trap(client.ResponseFailed)
4803                e = f.value
4804                reasons = e.reasons
4805                r0 = reasons[0]
4806                r0.trap(error.PageRedirect)
4807            d.addErrback(_ignore_redirect)
4808            return d
4809        # Create an ophandle, don't collect it, then advance the clock by
4810        # 4 days - 1 second and make sure that the ophandle is still there.
4811        yield _make_uncollected_ophandle(131)
4812        yield self.clock.advance((96*60*60) - 1) # 96 hours = 4 days
4813        res = yield self.GET("/operations/131?t=status&output=JSON")
4814        data = json.loads(res)
4815        self.failUnless("finished" in data, res)
4816
4817        # Create an ophandle, don't collect it, then try to collect it
4818        # after 4 days. It should be gone.
4819        yield _make_uncollected_ophandle(132)
4820        yield self.clock.advance(96*60*60)
4821        op_url = self.webish_url + "/operations/132?t=status&output=JSON"
4822        yield self.assertHTTPError(op_url, 404, "unknown/expired handle '132'")
4823
4824    @inlineCallbacks
4825    def test_collected_ophandle_expiration(self):
4826        # collected ophandles should expire after 1 day
4827        def _make_collected_ophandle(ophandle):
4828            url = (self.webish_url + self.public_url +
4829                   "/foo?t=start-manifest&ophandle=%d" % ophandle)
4830            # By following the initial redirect, we collect the ophandle
4831            # we've just created.
4832            return do_http("post", url,
4833                           allow_redirects=True, browser_like_redirects=True)
4834        # Create a collected ophandle, then collect it after 23 hours
4835        # and 59 seconds to make sure that it is still there.
4836        yield _make_collected_ophandle(133)
4837        yield self.clock.advance((24*60*60) - 1)
4838        res = yield self.GET("/operations/133?t=status&output=JSON")
4839        data = json.loads(res)
4840        self.failUnless("finished" in data, res)
4841
4842        # Create another uncollected ophandle, then try to collect it
4843        # after 24 hours to make sure that it is gone.
4844        yield _make_collected_ophandle(134)
4845        yield self.clock.advance(24*60*60)
4846        op_url = self.webish_url + "/operations/134?t=status&output=JSON"
4847        yield self.assertHTTPError(op_url, 404, "unknown/expired handle '134'")
4848
4849    @inlineCallbacks
4850    def test_uri_redirect(self):
4851        """URI redirects don't cause failure.
4852
4853        Unit test reproducer for https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3590
4854        """
4855        def req(method, path, **kwargs):
4856            return treq.request(method, self.webish_url + path, persistent=False,
4857                                **kwargs)
4858
4859        response = yield req("POST", "/uri?format=sdmf&t=mkdir")
4860        dircap = yield response.content()
4861        assert dircap.startswith(b'URI:DIR2:')
4862        dircap_uri = "/uri/?uri={}&t=json".format(urlquote(dircap))
4863
4864        response = yield req(
4865            "GET",
4866            dircap_uri,
4867        )
4868        self.assertEqual(
4869            str(response.request.absoluteURI, "utf-8"),
4870            self.webish_url + "/uri/{}?t=json".format(urlquote(dircap)))
4871        if response.code >= 400:
4872            raise Error(response.code, response=response.content())
4873
4874    def test_incident(self):
4875        d = self.POST("/report_incident", details="eek")
4876        def _done(res):
4877            self.failIfIn(b"<html>", res)
4878            self.failUnlessIn(b"An incident report has been saved", res)
4879        d.addCallback(_done)
4880        return d
4881
4882    def test_static(self):
4883        webdir = os.path.join(self.staticdir, "subdir")
4884        fileutil.make_dirs(webdir)
4885        f = open(os.path.join(webdir, "hello.txt"), "wb")
4886        f.write(b"hello")
4887        f.close()
4888
4889        d = self.GET("/static/subdir/hello.txt")
4890        def _check(res):
4891            self.failUnlessReallyEqual(res, b"hello")
4892        d.addCallback(_check)
4893        return d
4894
4895    def test_static_missing(self):
4896        # self.staticdir does not exist yet, because we used self.mktemp()
4897        d = self.assertFailure(self.GET("/static"), error.Error)
4898        # If os.stat raises an exception for the missing directory and the
4899        # traceback reveals the parent directory name we don't want to see
4900        # that parent directory name in the response.  This addresses #1720.
4901        d.addCallback(lambda e: self.assertEquals(str(e), "404 Not Found"))
4902        return d
4903
4904
4905class HumanizeExceptionTests(TrialTestCase):
4906    """
4907    Tests for ``humanize_exception``.
4908    """
4909    def test_mustbereadonly(self):
4910        """
4911        ``humanize_exception`` describes ``MustBeReadonlyError``.
4912        """
4913        text, code = humanize_exception(
4914            MustBeReadonlyError(
4915                "URI:DIR2 directory writecap used in a read-only context",
4916                "<unknown name>",
4917            ),
4918        )
4919        self.assertIn("MustBeReadonlyError", text)
4920        self.assertEqual(code, http.BAD_REQUEST)
4921
4922    def test_filetoolarge(self):
4923        """
4924        ``humanize_exception`` describes ``FileTooLargeError``.
4925        """
4926        text, code = humanize_exception(
4927            FileTooLargeError(
4928                "This file is too large to be uploaded (data_size).",
4929            ),
4930        )
4931        self.assertIn("FileTooLargeError", text)
4932        self.assertEqual(code, http.REQUEST_ENTITY_TOO_LARGE)
Note: See TracBrowser for help on using the repository browser.