source: trunk/src/allmydata/test/test_system.py

Last change on this file was 4da491a, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-03-11T20:37:27Z

remove more usage of "future"

  • Property mode set to 100644
File size: 83.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6from past.builtins import chr as byteschr
7from six import ensure_text
8
9import os, re, sys, time, json
10from typing import Optional
11
12from bs4 import BeautifulSoup
13
14from twisted.trial import unittest
15from twisted.internet import defer
16
17from allmydata import uri
18from allmydata.storage.mutable import MutableShareFile
19from allmydata.storage.immutable import ShareFile
20from allmydata.storage.server import si_a2b
21from allmydata.immutable import offloaded, upload
22from allmydata.immutable.literal import LiteralFileNode
23from allmydata.immutable.filenode import ImmutableFileNode
24from allmydata.util import idlib, mathutil
25from allmydata.util import log, base32
26from allmydata.util.encodingutil import quote_output, unicode_to_argv
27from allmydata.util.fileutil import abspath_expanduser_unicode
28from allmydata.util.consumer import MemoryConsumer, download_to_data
29from allmydata.util.deferredutil import async_to_deferred
30from allmydata.interfaces import IDirectoryNode, IFileNode, \
31     NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION
32from allmydata.monitor import Monitor
33from allmydata.mutable.common import NotWriteableError
34from allmydata.mutable import layout as mutable_layout
35from allmydata.mutable.publish import MutableData
36
37from foolscap.api import DeadReferenceError, fireEventually
38from twisted.python.failure import Failure
39from twisted.internet.utils import (
40    getProcessOutputAndValue,
41)
42
43from .common_web import do_http as do_http_bytes, Error
44from .web.common import (
45    assert_soup_has_tag_with_attributes
46)
47from .common_system import SystemTestMixin
48from .common_util import run_cli_unicode
49
50
51class RunBinTahoeMixin(object):
52    def run_bintahoe(self, args, stdin=None, python_options:Optional[list[str]]=None, env=None):
53        # test_runner.run_bintahoe has better unicode support but doesn't
54        # support env yet and is also synchronous.  If we could get rid of
55        # this in favor of that, though, it would probably be an improvement.
56        if python_options is None:
57            python_options = []
58        command = sys.executable
59        argv = python_options + ["-b", "-m", "allmydata.scripts.runner"] + args
60
61        if env is None:
62            env = os.environ
63
64        d = getProcessOutputAndValue(command, argv, env, stdinBytes=stdin)
65        def fix_signal(result):
66            # Mirror subprocess.Popen.returncode structure
67            (out, err, signal) = result
68            return (out, err, -signal)
69        d.addErrback(fix_signal)
70        return d
71
72
73def run_cli(*args, **kwargs):
74    """
75    Run a Tahoe-LAFS CLI utility, but inline.
76
77    Version of run_cli_unicode() that takes any kind of string, and the
78    command-line args inline instead of as verb + list.
79
80    Backwards compatible version so we don't have to change all the tests that
81    expected this API.
82    """
83    nodeargs = [ensure_text(a) for a in kwargs.pop("nodeargs", [])]
84    kwargs["nodeargs"] = nodeargs
85    return run_cli_unicode(
86        ensure_text(args[0]), [ensure_text(a) for a in args[1:]], **kwargs)
87
88
89def do_http(*args, **kwargs):
90    """Wrapper for do_http() that returns Unicode."""
91    return do_http_bytes(*args, **kwargs).addCallback(
92        lambda b: str(b, "utf-8"))
93
94
95LARGE_DATA = b"""
96This is some data to publish to the remote grid.., which needs to be large
97enough to not fit inside a LIT uri.
98"""
99
100
101class CountingDataUploadable(upload.Data):
102    bytes_read = 0
103    interrupt_after = None
104    interrupt_after_d = None
105
106    def read(self, length):
107        self.bytes_read += length
108        if self.interrupt_after is not None:
109            if self.bytes_read > self.interrupt_after:
110                self.interrupt_after = None
111                self.interrupt_after_d.callback(self)
112        return upload.Data.read(self, length)
113
114
115class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
116    """Foolscap integration-y tests."""
117    FORCE_FOOLSCAP_FOR_STORAGE = True
118    timeout = 180
119
120    @property
121    def basedir(self):
122        return "system/SystemTest/{}-foolscap-{}".format(
123            self.id().split(".")[-1], self.FORCE_FOOLSCAP_FOR_STORAGE
124        )
125
126    def test_connections(self):
127        d = self.set_up_nodes()
128        self.extra_node = None
129        d.addCallback(lambda res: self.add_extra_node(self.numclients))
130        def _check(extra_node):
131            self.extra_node = extra_node
132            for c in self.clients:
133                all_peerids = c.get_storage_broker().get_all_serverids()
134                self.failUnlessEqual(len(all_peerids), self.numclients+1)
135                sb = c.storage_broker
136                permuted_peers = sb.get_servers_for_psi("a")
137                self.failUnlessEqual(len(permuted_peers), self.numclients+1)
138
139        d.addCallback(_check)
140        def _shutdown_extra_node(res):
141            if self.extra_node:
142                return self.extra_node.stopService()
143            return res
144        d.addBoth(_shutdown_extra_node)
145        return d
146    # test_connections is subsumed by test_upload_and_download, and takes
147    # quite a while to run on a slow machine (because of all the TLS
148    # connections that must be established). If we ever rework the introducer
149    # code to such an extent that we're not sure if it works anymore, we can
150    # reinstate this test until it does.
151    del test_connections
152
153    def test_upload_and_download_random_key(self):
154        return self._test_upload_and_download(convergence=None)
155
156    def test_upload_and_download_convergent(self):
157        return self._test_upload_and_download(convergence=b"some convergence string")
158
159    def _test_upload_and_download(self, convergence):
160        # we use 4000 bytes of data, which will result in about 400k written
161        # to disk among all our simulated nodes
162        DATA = b"Some data to upload\n" * 200
163        d = self.set_up_nodes()
164        def _check_connections(res):
165            for c in self.clients:
166                c.encoding_params['happy'] = 5
167                all_peerids = c.get_storage_broker().get_all_serverids()
168                self.failUnlessEqual(len(all_peerids), self.numclients)
169                sb = c.storage_broker
170                permuted_peers = sb.get_servers_for_psi(b"a")
171                self.failUnlessEqual(len(permuted_peers), self.numclients)
172        d.addCallback(_check_connections)
173
174        def _do_upload(res):
175            log.msg("UPLOADING")
176            u = self.clients[0].getServiceNamed("uploader")
177            self.uploader = u
178            # we crank the max segsize down to 1024b for the duration of this
179            # test, so we can exercise multiple segments. It is important
180            # that this is not a multiple of the segment size, so that the
181            # tail segment is not the same length as the others. This actualy
182            # gets rounded up to 1025 to be a multiple of the number of
183            # required shares (since we use 25 out of 100 FEC).
184            up = upload.Data(DATA, convergence=convergence)
185            up.max_segment_size = 1024
186            d1 = u.upload(up)
187            return d1
188        d.addCallback(_do_upload)
189        def _upload_done(results):
190            theuri = results.get_uri()
191            log.msg("upload finished: uri is %r" % (theuri,))
192            self.uri = theuri
193            assert isinstance(self.uri, bytes), self.uri
194            self.cap = uri.from_string(self.uri)
195            self.n = self.clients[1].create_node_from_uri(self.uri)
196        d.addCallback(_upload_done)
197
198        def _upload_again(res):
199            # Upload again. If using convergent encryption then this ought to be
200            # short-circuited, however with the way we currently generate URIs
201            # (i.e. because they include the roothash), we have to do all of the
202            # encoding work, and only get to save on the upload part.
203            log.msg("UPLOADING AGAIN")
204            up = upload.Data(DATA, convergence=convergence)
205            up.max_segment_size = 1024
206            return self.uploader.upload(up)
207        d.addCallback(_upload_again)
208
209        def _download_to_data(res):
210            log.msg("DOWNLOADING")
211            return download_to_data(self.n)
212        d.addCallback(_download_to_data)
213        def _download_to_data_done(data):
214            log.msg("download finished")
215            self.failUnlessEqual(data, DATA)
216        d.addCallback(_download_to_data_done)
217
218        def _test_read(res):
219            n = self.clients[1].create_node_from_uri(self.uri)
220            d = download_to_data(n)
221            def _read_done(data):
222                self.failUnlessEqual(data, DATA)
223            d.addCallback(_read_done)
224            d.addCallback(lambda ign:
225                          n.read(MemoryConsumer(), offset=1, size=4))
226            def _read_portion_done(mc):
227                self.failUnlessEqual(b"".join(mc.chunks), DATA[1:1+4])
228            d.addCallback(_read_portion_done)
229            d.addCallback(lambda ign:
230                          n.read(MemoryConsumer(), offset=2, size=None))
231            def _read_tail_done(mc):
232                self.failUnlessEqual(b"".join(mc.chunks), DATA[2:])
233            d.addCallback(_read_tail_done)
234            d.addCallback(lambda ign:
235                          n.read(MemoryConsumer(), size=len(DATA)+1000))
236            def _read_too_much(mc):
237                self.failUnlessEqual(b"".join(mc.chunks), DATA)
238            d.addCallback(_read_too_much)
239
240            return d
241        d.addCallback(_test_read)
242
243        def _test_bad_read(res):
244            bad_u = uri.from_string_filenode(self.uri)
245            bad_u.key = self.flip_bit(bad_u.key)
246            bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
247            # this should cause an error during download
248
249            d = self.shouldFail2(NoSharesError, "'download bad node'",
250                                 None,
251                                 bad_n.read, MemoryConsumer(), offset=2)
252            return d
253        d.addCallback(_test_bad_read)
254
255        def _download_nonexistent_uri(res):
256            baduri = self.mangle_uri(self.uri)
257            badnode = self.clients[1].create_node_from_uri(baduri)
258            log.msg("about to download non-existent URI", level=log.UNUSUAL,
259                    facility="tahoe.tests")
260            d1 = download_to_data(badnode)
261            def _baduri_should_fail(res):
262                log.msg("finished downloading non-existent URI",
263                        level=log.UNUSUAL, facility="tahoe.tests")
264                self.failUnless(isinstance(res, Failure))
265                self.failUnless(res.check(NoSharesError),
266                                "expected NoSharesError, got %s" % res)
267            d1.addBoth(_baduri_should_fail)
268            return d1
269        d.addCallback(_download_nonexistent_uri)
270
271        # add a new node, which doesn't accept shares, and only uses the
272        # helper for upload.
273        d.addCallback(lambda res: self.add_extra_node(self.numclients,
274                                                      self.helper_furl,
275                                                      add_to_sparent=True))
276        def _added(extra_node):
277            self.extra_node = extra_node
278            self.extra_node.encoding_params['happy'] = 5
279        d.addCallback(_added)
280
281        def _has_helper():
282            uploader = self.extra_node.getServiceNamed("uploader")
283            furl, connected = uploader.get_helper_info()
284            return connected
285        d.addCallback(lambda ign: self.poll(_has_helper))
286
287        HELPER_DATA = b"Data that needs help to upload" * 1000
288        def _upload_with_helper(res):
289            u = upload.Data(HELPER_DATA, convergence=convergence)
290            d = self.extra_node.upload(u)
291            def _uploaded(results):
292                n = self.clients[1].create_node_from_uri(results.get_uri())
293                return download_to_data(n)
294            d.addCallback(_uploaded)
295            def _check(newdata):
296                self.failUnlessEqual(newdata, HELPER_DATA)
297            d.addCallback(_check)
298            return d
299        d.addCallback(_upload_with_helper)
300
301        def _upload_duplicate_with_helper(res):
302            u = upload.Data(HELPER_DATA, convergence=convergence)
303            u.debug_stash_RemoteEncryptedUploadable = True
304            d = self.extra_node.upload(u)
305            def _uploaded(results):
306                n = self.clients[1].create_node_from_uri(results.get_uri())
307                return download_to_data(n)
308            d.addCallback(_uploaded)
309            def _check(newdata):
310                self.failUnlessEqual(newdata, HELPER_DATA)
311                self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
312                            "uploadable started uploading, should have been avoided")
313            d.addCallback(_check)
314            return d
315        if convergence is not None:
316            d.addCallback(_upload_duplicate_with_helper)
317
318        d.addCallback(fireEventually)
319
320        def _upload_resumable(res):
321            DATA = b"Data that needs help to upload and gets interrupted" * 1000
322            u1 = CountingDataUploadable(DATA, convergence=convergence)
323            u2 = CountingDataUploadable(DATA, convergence=convergence)
324
325            # we interrupt the connection after about 5kB by shutting down
326            # the helper, then restarting it.
327            u1.interrupt_after = 5000
328            u1.interrupt_after_d = defer.Deferred()
329            bounced_d = defer.Deferred()
330            def _do_bounce(res):
331                d = self.bounce_client(0)
332                d.addBoth(bounced_d.callback)
333            u1.interrupt_after_d.addCallback(_do_bounce)
334
335            # sneak into the helper and reduce its chunk size, so that our
336            # debug_interrupt will sever the connection on about the fifth
337            # chunk fetched. This makes sure that we've started to write the
338            # new shares before we abandon them, which exercises the
339            # abort/delete-partial-share code. TODO: find a cleaner way to do
340            # this. I know that this will affect later uses of the helper in
341            # this same test run, but I'm not currently worried about it.
342            offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
343
344            upload_d = self.extra_node.upload(u1)
345            # The upload will start, and bounce_client() will be called after
346            # about 5kB. bounced_d will fire after bounce_client() finishes
347            # shutting down and restarting the node.
348            d = bounced_d
349            def _bounced(ign):
350                # By this point, the upload should have failed because of the
351                # interruption. upload_d will fire in a moment
352                def _should_not_finish(res):
353                    self.fail("interrupted upload should have failed, not"
354                              " finished with result %s" % (res,))
355                def _interrupted(f):
356                    f.trap(DeadReferenceError)
357                    # make sure we actually interrupted it before finishing
358                    # the file
359                    self.failUnless(u1.bytes_read < len(DATA),
360                                    "read %d out of %d total" %
361                                    (u1.bytes_read, len(DATA)))
362                upload_d.addCallbacks(_should_not_finish, _interrupted)
363                return upload_d
364            d.addCallback(_bounced)
365
366            def _disconnected(res):
367                # check to make sure the storage servers aren't still hanging
368                # on to the partial share: their incoming/ directories should
369                # now be empty.
370                log.msg("disconnected", level=log.NOISY,
371                        facility="tahoe.test.test_system")
372                for i in range(self.numclients):
373                    incdir = os.path.join(self.getdir("client%d" % i),
374                                          "storage", "shares", "incoming")
375                    self.failIf(os.path.exists(incdir) and os.listdir(incdir))
376            d.addCallback(_disconnected)
377
378            d.addCallback(lambda res:
379                          log.msg("wait_for_helper", level=log.NOISY,
380                                  facility="tahoe.test.test_system"))
381            # then we need to wait for the extra node to reestablish its
382            # connection to the helper.
383            d.addCallback(lambda ign: self.poll(_has_helper))
384
385            d.addCallback(lambda res:
386                          log.msg("uploading again", level=log.NOISY,
387                                  facility="tahoe.test.test_system"))
388            d.addCallback(lambda res: self.extra_node.upload(u2))
389
390            def _uploaded(results):
391                cap = results.get_uri()
392                log.msg("Second upload complete", level=log.NOISY,
393                        facility="tahoe.test.test_system")
394
395                # this is really bytes received rather than sent, but it's
396                # convenient and basically measures the same thing
397                bytes_sent = results.get_ciphertext_fetched()
398                self.failUnless(isinstance(bytes_sent, int), bytes_sent)
399
400                # We currently don't support resumption of upload if the data is
401                # encrypted with a random key.  (Because that would require us
402                # to store the key locally and re-use it on the next upload of
403                # this file, which isn't a bad thing to do, but we currently
404                # don't do it.)
405                if convergence is not None:
406                    # Make sure we did not have to read the whole file the
407                    # second time around .
408                    self.failUnless(bytes_sent < len(DATA),
409                                    "resumption didn't save us any work:"
410                                    " read %r bytes out of %r total" %
411                                    (bytes_sent, len(DATA)))
412                else:
413                    # Make sure we did have to read the whole file the second
414                    # time around -- because the one that we partially uploaded
415                    # earlier was encrypted with a different random key.
416                    self.failIf(bytes_sent < len(DATA),
417                                "resumption saved us some work even though we were using random keys:"
418                                " read %r bytes out of %r total" %
419                                (bytes_sent, len(DATA)))
420                n = self.clients[1].create_node_from_uri(cap)
421                return download_to_data(n)
422            d.addCallback(_uploaded)
423
424            def _check(newdata):
425                self.failUnlessEqual(newdata, DATA)
426                # If using convergent encryption, then also check that the
427                # helper has removed the temp file from its directories.
428                if convergence is not None:
429                    basedir = os.path.join(self.getdir("client0"), "helper")
430                    files = os.listdir(os.path.join(basedir, "CHK_encoding"))
431                    self.failUnlessEqual(files, [])
432                    files = os.listdir(os.path.join(basedir, "CHK_incoming"))
433                    self.failUnlessEqual(files, [])
434            d.addCallback(_check)
435            return d
436        d.addCallback(_upload_resumable)
437
438        def _grab_stats(ignored):
439            stats = self.clients[0].stats_provider.get_stats()
440            s = stats["stats"]
441            self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
442            c = stats["counters"]
443            self.failUnless("storage_server.allocate" in c)
444        d.addCallback(_grab_stats)
445
446        return d
447
448    def _find_all_shares(self, basedir):
449        shares = []
450        for (dirpath, dirnames, filenames) in os.walk(basedir):
451            if "storage" not in dirpath:
452                continue
453            if not filenames:
454                continue
455            pieces = dirpath.split(os.sep)
456            if (len(pieces) >= 5
457                and pieces[-4] == "storage"
458                and pieces[-3] == "shares"):
459                # we're sitting in .../storage/shares/$START/$SINDEX , and there
460                # are sharefiles here
461                assert pieces[-5].startswith("client")
462                client_num = int(pieces[-5][-1])
463                storage_index_s = pieces[-1]
464                storage_index = si_a2b(storage_index_s.encode("ascii"))
465                for sharename in filenames:
466                    shnum = int(sharename)
467                    filename = os.path.join(dirpath, sharename)
468                    data = (client_num, storage_index, filename, shnum)
469                    shares.append(data)
470        if not shares:
471            self.fail("unable to find any share files in %s" % basedir)
472        return shares
473
474    def _corrupt_mutable_share(self, filename, which):
475        msf = MutableShareFile(filename)
476        # Read more than share length:
477        datav = msf.readv([ (0, 10_000_000) ])
478        final_share = datav[0]
479        assert len(final_share) < 10_000_000 # ought to be truncated
480        pieces = mutable_layout.unpack_share(final_share)
481        (seqnum, root_hash, IV, k, N, segsize, datalen,
482         verification_key, signature, share_hash_chain, block_hash_tree,
483         share_data, enc_privkey) = pieces
484
485        if which == "seqnum":
486            seqnum = seqnum + 15
487        elif which == "R":
488            root_hash = self.flip_bit(root_hash)
489        elif which == "IV":
490            IV = self.flip_bit(IV)
491        elif which == "segsize":
492            segsize = segsize + 15
493        elif which == "pubkey":
494            verification_key = self.flip_bit(verification_key)
495        elif which == "signature":
496            signature = self.flip_bit(signature)
497        elif which == "share_hash_chain":
498            nodenum = list(share_hash_chain.keys())[0]
499            share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
500        elif which == "block_hash_tree":
501            block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
502        elif which == "share_data":
503            share_data = self.flip_bit(share_data)
504        elif which == "encprivkey":
505            enc_privkey = self.flip_bit(enc_privkey)
506
507        prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
508                                            segsize, datalen)
509        final_share = mutable_layout.pack_share(prefix,
510                                                verification_key,
511                                                signature,
512                                                share_hash_chain,
513                                                block_hash_tree,
514                                                share_data,
515                                                enc_privkey)
516        msf.writev( [(0, final_share)], None)
517
518
519    def test_mutable_sdmf(self):
520        """SDMF mutables can be uploaded, downloaded, and many other things."""
521        return self._test_mutable(SDMF_VERSION)
522
523    def test_mutable_mdmf(self):
524        """MDMF mutables can be uploaded, downloaded, and many other things."""
525        return self._test_mutable(MDMF_VERSION)
526
527    def _test_mutable(self, mutable_version):
528        DATA = b"initial contents go here."  # 25 bytes % 3 != 0
529        DATA_uploadable = MutableData(DATA)
530        NEWDATA = b"new contents yay"
531        NEWDATA_uploadable = MutableData(NEWDATA)
532        NEWERDATA = b"this is getting old" * 1_000_000
533        NEWERDATA_uploadable = MutableData(NEWERDATA)
534
535        d = self.set_up_nodes()
536
537        def _create_mutable(res):
538            c = self.clients[0]
539            log.msg("starting create_mutable_file")
540            d1 = c.create_mutable_file(DATA_uploadable, mutable_version)
541            def _done(res):
542                log.msg("DONE: %s" % (res,))
543                self._mutable_node_1 = res
544            d1.addCallback(_done)
545            return d1
546        d.addCallback(_create_mutable)
547
548        @defer.inlineCallbacks
549        def _test_debug(res):
550            # find a share. It is important to run this while there is only
551            # one slot in the grid.
552            shares = self._find_all_shares(self.basedir)
553            (client_num, storage_index, filename, shnum) = shares[0]
554            log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
555                    % filename)
556            log.msg(" for clients[%d]" % client_num)
557
558            rc,output,err = yield run_cli("debug", "dump-share", "--offsets",
559                                          filename)
560            self.failUnlessEqual(rc, 0)
561            try:
562                share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF'
563                self.failUnless("Mutable slot found:\n" in output)
564                self.assertIn(f"share_type: {share_type}\n", output)
565                peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
566                self.failUnless(" WE for nodeid: %s\n" % peerid in output)
567                self.failUnless(" num_extra_leases: 0\n" in output)
568                self.failUnless("  secrets are for nodeid: %s\n" % peerid
569                                in output)
570                self.failUnless(f" {share_type} contents:\n" in output)
571                self.failUnless("  seqnum: 1\n" in output)
572                self.failUnless("  required_shares: 3\n" in output)
573                self.failUnless("  total_shares: 10\n" in output)
574                if mutable_version == SDMF_VERSION:
575                    self.failUnless("  segsize: 27\n" in output, (output, filename))
576                self.failUnless("  datalen: 25\n" in output)
577                # the exact share_hash_chain nodes depends upon the sharenum,
578                # and is more of a hassle to compute than I want to deal with
579                # now
580                self.failUnless("  share_hash_chain: " in output)
581                self.failUnless("  block_hash_tree: 1 nodes\n" in output)
582                if mutable_version == SDMF_VERSION:
583                    expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
584                                str(base32.b2a(storage_index), "ascii"))
585                else:
586                    expected = ("  verify-cap: URI:MDMF-Verifier:%s" %
587                                str(base32.b2a(storage_index), "ascii"))
588                self.assertIn(expected, output)
589            except unittest.FailTest:
590                print()
591                print("dump-share output was:")
592                print(output)
593                raise
594        d.addCallback(_test_debug)
595
596        # test retrieval
597
598        # first, let's see if we can use the existing node to retrieve the
599        # contents. This allows it to use the cached pubkey and maybe the
600        # latest-known sharemap.
601
602        d.addCallback(lambda res: self._mutable_node_1.download_best_version())
603        def _check_download_1(res):
604            self.failUnlessEqual(res, DATA)
605            # now we see if we can retrieve the data from a new node,
606            # constructed using the URI of the original one. We do this test
607            # on the same client that uploaded the data.
608            uri = self._mutable_node_1.get_uri()
609            log.msg("starting retrieve1")
610            newnode = self.clients[0].create_node_from_uri(uri)
611            newnode_2 = self.clients[0].create_node_from_uri(uri)
612            self.failUnlessIdentical(newnode, newnode_2)
613            return newnode.download_best_version()
614        d.addCallback(_check_download_1)
615
616        def _check_download_2(res):
617            self.failUnlessEqual(res, DATA)
618            # same thing, but with a different client
619            uri = self._mutable_node_1.get_uri()
620            newnode = self.clients[1].create_node_from_uri(uri)
621            log.msg("starting retrieve2")
622            d1 = newnode.download_best_version()
623            d1.addCallback(lambda res: (res, newnode))
624            return d1
625        d.addCallback(_check_download_2)
626
627        def _check_download_3(res_and_newnode):
628            (res, newnode) = res_and_newnode
629            self.failUnlessEqual(res, DATA)
630            # replace the data
631            log.msg("starting replace1")
632            d1 = newnode.overwrite(NEWDATA_uploadable)
633            d1.addCallback(lambda res: newnode.download_best_version())
634            return d1
635        d.addCallback(_check_download_3)
636
637        def _check_download_4(res):
638            self.failUnlessEqual(res, NEWDATA)
639            # now create an even newer node and replace the data on it. This
640            # new node has never been used for download before.
641            uri = self._mutable_node_1.get_uri()
642            newnode1 = self.clients[2].create_node_from_uri(uri)
643            newnode2 = self.clients[3].create_node_from_uri(uri)
644            self._newnode3 = self.clients[3].create_node_from_uri(uri)
645            log.msg("starting replace2")
646            d1 = newnode1.overwrite(NEWERDATA_uploadable)
647            d1.addCallback(lambda res: newnode2.download_best_version())
648            return d1
649        d.addCallback(_check_download_4)
650
651        def _check_download_5(res):
652            log.msg("finished replace2")
653            self.failUnlessEqual(res, NEWERDATA)
654        d.addCallback(_check_download_5)
655
656        # The previous checks upload a complete replacement. This uses a
657        # different API that is supposed to do a partial write at an offset.
658        @async_to_deferred
659        async def _check_write_at_offset(newnode):
660            log.msg("writing at offset")
661            start = b"abcdef"
662            expected = b"abXYef"
663            uri = self._mutable_node_1.get_uri()
664            newnode = self.clients[0].create_node_from_uri(uri)
665            await newnode.overwrite(MutableData(start))
666            version = await newnode.get_mutable_version()
667            await version.update(MutableData(b"XY"), 2)
668            result = await newnode.download_best_version()
669            self.assertEqual(result, expected)
670            # Revert to previous version
671            await newnode.overwrite(MutableData(NEWERDATA))
672        d.addCallback(_check_write_at_offset)
673
674        def _corrupt_shares(_res):
675            # run around and flip bits in all but k of the shares, to test
676            # the hash checks
677            shares = self._find_all_shares(self.basedir)
678            ## sort by share number
679            #shares.sort( lambda a,b: cmp(a[3], b[3]) )
680            where = dict([ (shnum, filename)
681                           for (client_num, storage_index, filename, shnum)
682                           in shares ])
683            assert len(where) == 10 # this test is designed for 3-of-10
684            for shnum, filename in list(where.items()):
685                # shares 7,8,9 are left alone. read will check
686                # (share_hash_chain, block_hash_tree, share_data). New
687                # seqnum+R pairs will trigger a check of (seqnum, R, IV,
688                # segsize, signature).
689                if shnum == 0:
690                    # read: this will trigger "pubkey doesn't match
691                    # fingerprint".
692                    self._corrupt_mutable_share(filename, "pubkey")
693                    self._corrupt_mutable_share(filename, "encprivkey")
694                elif shnum == 1:
695                    # triggers "signature is invalid"
696                    self._corrupt_mutable_share(filename, "seqnum")
697                elif shnum == 2:
698                    # triggers "signature is invalid"
699                    self._corrupt_mutable_share(filename, "R")
700                elif shnum == 3:
701                    # triggers "signature is invalid"
702                    self._corrupt_mutable_share(filename, "segsize")
703                elif shnum == 4:
704                    self._corrupt_mutable_share(filename, "share_hash_chain")
705                elif shnum == 5:
706                    self._corrupt_mutable_share(filename, "block_hash_tree")
707                elif shnum == 6:
708                    self._corrupt_mutable_share(filename, "share_data")
709                # other things to correct: IV, signature
710                # 7,8,9 are left alone
711
712                # note that initial_query_count=5 means that we'll hit the
713                # first 5 servers in effectively random order (based upon
714                # response time), so we won't necessarily ever get a "pubkey
715                # doesn't match fingerprint" error (if we hit shnum>=1 before
716                # shnum=0, we pull the pubkey from there). To get repeatable
717                # specific failures, we need to set initial_query_count=1,
718                # but of course that will change the sequencing behavior of
719                # the retrieval process. TODO: find a reasonable way to make
720                # this a parameter, probably when we expand this test to test
721                # for one failure mode at a time.
722
723                # when we retrieve this, we should get three signature
724                # failures (where we've mangled seqnum, R, and segsize). The
725                # pubkey mangling
726
727        if mutable_version == SDMF_VERSION:
728            # TODO Corrupting shares in test_systm doesn't work for MDMF right now
729            d.addCallback(_corrupt_shares)
730
731        d.addCallback(lambda res: self._newnode3.download_best_version())
732        d.addCallback(_check_download_5)
733
734        def _check_empty_file(res):
735            # make sure we can create empty files, this usually screws up the
736            # segsize math
737            d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version)
738            d1.addCallback(lambda newnode: newnode.download_best_version())
739            d1.addCallback(lambda res: self.failUnlessEqual(b"", res))
740            return d1
741        d.addCallback(_check_empty_file)
742
743        d.addCallback(lambda res: self.clients[0].create_dirnode())
744        def _created_dirnode(dnode):
745            log.msg("_created_dirnode(%s)" % (dnode,))
746            d1 = dnode.list()
747            d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
748            d1.addCallback(lambda res: dnode.has_child(u"edgar"))
749            d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
750            d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
751            d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
752            d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
753            d1.addCallback(lambda res: dnode.build_manifest().when_done())
754            d1.addCallback(lambda res:
755                           self.failUnlessEqual(len(res["manifest"]), 1))
756            return d1
757        d.addCallback(_created_dirnode)
758
759        return d
760
761    def flip_bit(self, good):
762        return good[:-1] + byteschr(ord(good[-1:]) ^ 0x01)
763
764    def mangle_uri(self, gooduri):
765        # change the key, which changes the storage index, which means we'll
766        # be asking about the wrong file, so nobody will have any shares
767        u = uri.from_string(gooduri)
768        u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
769                            uri_extension_hash=u.uri_extension_hash,
770                            needed_shares=u.needed_shares,
771                            total_shares=u.total_shares,
772                            size=u.size)
773        return u2.to_string()
774
775    # TODO: add a test which mangles the uri_extension_hash instead, and
776    # should fail due to not being able to get a valid uri_extension block.
777    # Also a test which sneakily mangles the uri_extension block to change
778    # some of the validation data, so it will fail in the post-download phase
779    # when the file's crypttext integrity check fails. Do the same thing for
780    # the key, which should cause the download to fail the post-download
781    # plaintext_hash check.
782
783    def test_filesystem(self):
784        self.data = LARGE_DATA
785        d = self.set_up_nodes(2)
786        def _new_happy_semantics(ign):
787            for c in self.clients:
788                c.encoding_params['happy'] = 1
789        d.addCallback(_new_happy_semantics)
790        d.addCallback(self.log, "starting publish")
791        d.addCallback(self._do_publish1)
792        d.addCallback(self._test_runner)
793        d.addCallback(self._do_publish2)
794        # at this point, we have the following filesystem (where "R" denotes
795        # self._root_directory_uri):
796        # R
797        # R/subdir1
798        # R/subdir1/mydata567
799        # R/subdir1/subdir2/
800        # R/subdir1/subdir2/mydata992
801
802        d.addCallback(lambda res: self.bounce_client(0))
803        d.addCallback(self.log, "bounced client0")
804
805        d.addCallback(self._check_publish1)
806        d.addCallback(self.log, "did _check_publish1")
807        d.addCallback(self._check_publish2)
808        d.addCallback(self.log, "did _check_publish2")
809        d.addCallback(self._do_publish_private)
810        d.addCallback(self.log, "did _do_publish_private")
811        # now we also have (where "P" denotes a new dir):
812        #  P/personal/sekrit data
813        #  P/s2-rw -> /subdir1/subdir2/
814        #  P/s2-ro -> /subdir1/subdir2/ (read-only)
815        d.addCallback(self._check_publish_private)
816        d.addCallback(self.log, "did _check_publish_private")
817        d.addCallback(self._test_web)
818        d.addCallback(self._test_cli)
819        # P now has four top-level children:
820        # P/personal/sekrit data
821        # P/s2-ro/
822        # P/s2-rw/
823        # P/test_put/  (empty)
824        d.addCallback(self._test_checker)
825        return d
826
827    def _do_publish1(self, res):
828        ut = upload.Data(self.data, convergence=None)
829        c0 = self.clients[0]
830        d = c0.create_dirnode()
831        def _made_root(new_dirnode):
832            self._root_directory_uri = new_dirnode.get_uri()
833            return c0.create_node_from_uri(self._root_directory_uri)
834        d.addCallback(_made_root)
835        d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
836        def _made_subdir1(subdir1_node):
837            self._subdir1_node = subdir1_node
838            d1 = subdir1_node.add_file(u"mydata567", ut)
839            d1.addCallback(self.log, "publish finished")
840            def _stash_uri(filenode):
841                self.uri = filenode.get_uri()
842                assert isinstance(self.uri, bytes), (self.uri, filenode)
843            d1.addCallback(_stash_uri)
844            return d1
845        d.addCallback(_made_subdir1)
846        return d
847
848    def _do_publish2(self, res):
849        ut = upload.Data(self.data, convergence=None)
850        d = self._subdir1_node.create_subdirectory(u"subdir2")
851        d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
852        return d
853
854    def log(self, res, *args, **kwargs):
855        # print("MSG: %s  RES: %s" % (msg, args))
856        log.msg(*args, **kwargs)
857        return res
858
859    def _do_publish_private(self, res):
860        self.smalldata = b"sssh, very secret stuff"
861        ut = upload.Data(self.smalldata, convergence=None)
862        d = self.clients[0].create_dirnode()
863        d.addCallback(self.log, "GOT private directory")
864        def _got_new_dir(privnode):
865            rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
866            d1 = privnode.create_subdirectory(u"personal")
867            d1.addCallback(self.log, "made P/personal")
868            d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
869            d1.addCallback(self.log, "made P/personal/sekrit data")
870            d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
871            def _got_s2(s2node):
872                d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
873                                      s2node.get_readonly_uri())
874                d2.addCallback(lambda node:
875                               privnode.set_uri(u"s2-ro",
876                                                s2node.get_readonly_uri(),
877                                                s2node.get_readonly_uri()))
878                return d2
879            d1.addCallback(_got_s2)
880            d1.addCallback(lambda res: privnode)
881            return d1
882        d.addCallback(_got_new_dir)
883        return d
884
885    def _check_publish1(self, res):
886        # this one uses the iterative API
887        c1 = self.clients[1]
888        d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
889        d.addCallback(self.log, "check_publish1 got /")
890        d.addCallback(lambda root: root.get(u"subdir1"))
891        d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
892        d.addCallback(lambda filenode: download_to_data(filenode))
893        d.addCallback(self.log, "get finished")
894        def _get_done(data):
895            self.failUnlessEqual(data, self.data)
896        d.addCallback(_get_done)
897        return d
898
899    def _check_publish2(self, res):
900        # this one uses the path-based API
901        rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
902        d = rootnode.get_child_at_path(u"subdir1")
903        d.addCallback(lambda dirnode:
904                      self.failUnless(IDirectoryNode.providedBy(dirnode)))
905        d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906        d.addCallback(lambda filenode: download_to_data(filenode))
907        d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
908
909        d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
910        def _got_filenode(filenode):
911            fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
912            assert fnode == filenode
913        d.addCallback(_got_filenode)
914        return d
915
916    def _check_publish_private(self, resnode):
917        # this one uses the path-based API
918        self._private_node = resnode
919
920        d = self._private_node.get_child_at_path(u"personal")
921        def _got_personal(personal):
922            self._personal_node = personal
923            return personal
924        d.addCallback(_got_personal)
925
926        d.addCallback(lambda dirnode:
927                      self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
928        def get_path(path):
929            return self._private_node.get_child_at_path(path)
930
931        d.addCallback(lambda res: get_path(u"personal/sekrit data"))
932        d.addCallback(lambda filenode: download_to_data(filenode))
933        d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
934        d.addCallback(lambda res: get_path(u"s2-rw"))
935        d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
936        d.addCallback(lambda res: get_path(u"s2-ro"))
937        def _got_s2ro(dirnode):
938            self.failUnless(dirnode.is_mutable(), dirnode)
939            self.failUnless(dirnode.is_readonly(), dirnode)
940            d1 = defer.succeed(None)
941            d1.addCallback(lambda res: dirnode.list())
942            d1.addCallback(self.log, "dirnode.list")
943
944            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
945
946            d1.addCallback(self.log, "doing add_file(ro)")
947            ut = upload.Data(b"I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence=b"99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
948            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
949
950            d1.addCallback(self.log, "doing get(ro)")
951            d1.addCallback(lambda res: dirnode.get(u"mydata992"))
952            d1.addCallback(lambda filenode:
953                           self.failUnless(IFileNode.providedBy(filenode)))
954
955            d1.addCallback(self.log, "doing delete(ro)")
956            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
957
958            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
959
960            d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
961
962            personal = self._personal_node
963            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
964
965            d1.addCallback(self.log, "doing move_child_to(ro)2")
966            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
967
968            d1.addCallback(self.log, "finished with _got_s2ro")
969            return d1
970        d.addCallback(_got_s2ro)
971        def _got_home(dummy):
972            home = self._private_node
973            personal = self._personal_node
974            d1 = defer.succeed(None)
975            d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
976            d1.addCallback(lambda res:
977                           personal.move_child_to(u"sekrit data",home,u"sekrit"))
978
979            d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
980            d1.addCallback(lambda res:
981                           home.move_child_to(u"sekrit", home, u"sekrit data"))
982
983            d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
984            d1.addCallback(lambda res:
985                           home.move_child_to(u"sekrit data", personal))
986
987            d1.addCallback(lambda res: home.build_manifest().when_done())
988            d1.addCallback(self.log, "manifest")
989            #  five items:
990            # P/
991            # P/personal/
992            # P/personal/sekrit data
993            # P/s2-rw  (same as P/s2-ro)
994            # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
995            d1.addCallback(lambda res:
996                           self.failUnlessEqual(len(res["manifest"]), 5))
997            d1.addCallback(lambda res: home.start_deep_stats().when_done())
998            def _check_stats(stats):
999                expected = {"count-immutable-files": 1,
1000                            "count-mutable-files": 0,
1001                            "count-literal-files": 1,
1002                            "count-files": 2,
1003                            "count-directories": 3,
1004                            "size-immutable-files": 112,
1005                            "size-literal-files": 23,
1006                            #"size-directories": 616, # varies
1007                            #"largest-directory": 616,
1008                            "largest-directory-children": 3,
1009                            "largest-immutable-file": 112,
1010                            }
1011                for k,v in list(expected.items()):
1012                    self.failUnlessEqual(stats[k], v,
1013                                         "stats[%s] was %s, not %s" %
1014                                         (k, stats[k], v))
1015                self.failUnless(stats["size-directories"] > 1300,
1016                                stats["size-directories"])
1017                self.failUnless(stats["largest-directory"] > 800,
1018                                stats["largest-directory"])
1019                self.failUnlessEqual(stats["size-files-histogram"],
1020                                     [ (11, 31, 1), (101, 316, 1) ])
1021            d1.addCallback(_check_stats)
1022            return d1
1023        d.addCallback(_got_home)
1024        return d
1025
1026    def shouldFail(self, res, expected_failure, which, substring=None):
1027        if isinstance(res, Failure):
1028            res.trap(expected_failure)
1029            if substring:
1030                self.failUnless(substring in str(res),
1031                                "substring '%s' not in '%s'"
1032                                % (substring, str(res)))
1033        else:
1034            self.fail("%s was supposed to raise %s, not get '%s'" %
1035                      (which, expected_failure, res))
1036
1037    def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1038        assert substring is None or isinstance(substring, str)
1039        d = defer.maybeDeferred(callable, *args, **kwargs)
1040        def done(res):
1041            if isinstance(res, Failure):
1042                res.trap(expected_failure)
1043                if substring:
1044                    self.failUnless(substring in str(res),
1045                                    "substring '%s' not in '%s'"
1046                                    % (substring, str(res)))
1047            else:
1048                self.fail("%s was supposed to raise %s, not get '%s'" %
1049                          (which, expected_failure, res))
1050        d.addBoth(done)
1051        return d
1052
1053    def PUT(self, urlpath, data):
1054        return do_http("put", self.webish_url + urlpath, data=data)
1055
1056    def GET(self, urlpath):
1057        return do_http("get", self.webish_url + urlpath)
1058
1059    def POST(self, urlpath, use_helper=False, **fields):
1060        sepbase = b"boogabooga"
1061        sep = b"--" + sepbase
1062        form = []
1063        form.append(sep)
1064        form.append(b'Content-Disposition: form-data; name="_charset"')
1065        form.append(b'')
1066        form.append(b'UTF-8')
1067        form.append(sep)
1068        for name, value in fields.items():
1069            if isinstance(value, tuple):
1070                filename, value = value
1071                form.append(b'Content-Disposition: form-data; name="%s"; '
1072                            b'filename="%s"' % (name.encode("utf-8"), filename.encode("utf-8")))
1073            else:
1074                form.append(b'Content-Disposition: form-data; name="%s"' % name.encode("utf-8"))
1075            form.append(b'')
1076            form.append(b"%s" % (value,))
1077            form.append(sep)
1078        form[-1] += b"--"
1079        body = b""
1080        headers = {}
1081        if fields:
1082            body = b"\r\n".join(form) + b"\r\n"
1083            headers["content-type"] = "multipart/form-data; boundary=%s" % str(sepbase, "ascii")
1084        return self.POST2(urlpath, body, headers, use_helper)
1085
1086    def POST2(self, urlpath, body=b"", headers=None, use_helper=False):
1087        if headers is None:
1088            headers = {}
1089        if use_helper:
1090            url = self.helper_webish_url + urlpath
1091        else:
1092            url = self.webish_url + urlpath
1093        return do_http("post", url, data=body, headers=headers)
1094
1095    def _test_web(self, res):
1096        public = "uri/" + str(self._root_directory_uri, "ascii")
1097        d = self.GET("")
1098        def _got_welcome(page):
1099            html = page.replace('\n', ' ')
1100            connected_re = r'Connected to <span>%d</span>\s*of <span>%d</span> known storage servers' % (self.numclients, self.numclients)
1101            self.failUnless(re.search(connected_re, html),
1102                            "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1103            # nodeids/tubids don't have any regexp-special characters
1104            nodeid_re = r'<th>Node ID:</th>\s*<td title="TubID: %s">%s</td>' % (
1105                self.clients[0].get_long_tubid(), str(self.clients[0].get_long_nodeid(), "ascii"))
1106            self.failUnless(re.search(nodeid_re, html),
1107                            "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1108            self.failUnless("Helper: 0 active uploads" in page)
1109        d.addCallback(_got_welcome)
1110        d.addCallback(self.log, "done with _got_welcome")
1111
1112        # get the welcome page from the node that uses the helper too
1113        d.addCallback(lambda res: do_http("get", self.helper_webish_url))
1114        def _got_welcome_helper(page):
1115            soup = BeautifulSoup(page, 'html5lib')
1116            assert_soup_has_tag_with_attributes(
1117                self, soup, u"img",
1118                { u"alt": u"Connected", u"src": u"img/connected-yes.png" }
1119            )
1120            self.failUnlessIn("Not running helper", page)
1121        d.addCallback(_got_welcome_helper)
1122
1123        d.addCallback(lambda res: self.GET(public))
1124        d.addCallback(lambda res: self.GET(public + "/subdir1"))
1125        def _got_subdir1(page):
1126            # there ought to be an href for our file
1127            self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1128            self.failUnless(">mydata567</a>" in page)
1129        d.addCallback(_got_subdir1)
1130        d.addCallback(self.log, "done with _got_subdir1")
1131        d.addCallback(lambda res: self.GET(public + "/subdir1/mydata567"))
1132        def _got_data(page):
1133            self.failUnlessEqual(page.encode("utf-8"), self.data)
1134        d.addCallback(_got_data)
1135
1136        # download from a URI embedded in a URL
1137        d.addCallback(self.log, "_get_from_uri")
1138        def _get_from_uri(res):
1139            return self.GET("uri/%s?filename=%s" % (str(self.uri, "utf-8"), "mydata567"))
1140        d.addCallback(_get_from_uri)
1141        def _got_from_uri(page):
1142            self.failUnlessEqual(page.encode("utf-8"), self.data)
1143        d.addCallback(_got_from_uri)
1144
1145        # download from a URI embedded in a URL, second form
1146        d.addCallback(self.log, "_get_from_uri2")
1147        def _get_from_uri2(res):
1148            return self.GET("uri?uri=%s" % (str(self.uri, "utf-8"),))
1149        d.addCallback(_get_from_uri2)
1150        d.addCallback(_got_from_uri)
1151
1152        # download from a bogus URI, make sure we get a reasonable error
1153        d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1154        @defer.inlineCallbacks
1155        def _get_from_bogus_uri(res):
1156            d1 = self.GET("uri/%s?filename=%s"
1157                          % (str(self.mangle_uri(self.uri), "utf-8"), "mydata567"))
1158            e = yield self.assertFailure(d1, Error)
1159            self.assertEquals(e.status, b"410")
1160        d.addCallback(_get_from_bogus_uri)
1161        d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1162
1163        # upload a file with PUT
1164        d.addCallback(self.log, "about to try PUT")
1165        d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1166                                           b"new.txt contents"))
1167        d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1168        d.addCallback(self.failUnlessEqual, "new.txt contents")
1169        # and again with something large enough to use multiple segments,
1170        # and hopefully trigger pauseProducing too
1171        def _new_happy_semantics(ign):
1172            for c in self.clients:
1173                # these get reset somewhere? Whatever.
1174                c.encoding_params['happy'] = 1
1175        d.addCallback(_new_happy_semantics)
1176        d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1177                                           b"big" * 500000)) # 1.5MB
1178        d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1179        d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1180
1181        # can we replace files in place?
1182        d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1183                                           b"NEWER contents"))
1184        d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1185        d.addCallback(self.failUnlessEqual, "NEWER contents")
1186
1187        # test unlinked POST
1188        d.addCallback(lambda res: self.POST("uri", t=b"upload",
1189                                            file=("new.txt", b"data" * 10000)))
1190        # and again using the helper, which exercises different upload-status
1191        # display code
1192        d.addCallback(lambda res: self.POST("uri", use_helper=True, t=b"upload",
1193                                            file=("foo.txt", b"data2" * 10000)))
1194
1195        # check that the status page exists
1196        d.addCallback(lambda res: self.GET("status"))
1197        def _got_status(res):
1198            # find an interesting upload and download to look at. LIT files
1199            # are not interesting.
1200            h = self.clients[0].get_history()
1201            for ds in h.list_all_download_statuses():
1202                if ds.get_size() > 200:
1203                    self._down_status = ds.get_counter()
1204            for us in h.list_all_upload_statuses():
1205                if us.get_size() > 200:
1206                    self._up_status = us.get_counter()
1207            rs = list(h.list_all_retrieve_statuses())[0]
1208            self._retrieve_status = rs.get_counter()
1209            ps = list(h.list_all_publish_statuses())[0]
1210            self._publish_status = ps.get_counter()
1211            us = list(h.list_all_mapupdate_statuses())[0]
1212            self._update_status = us.get_counter()
1213
1214            # and that there are some upload- and download- status pages
1215            return self.GET("status/up-%d" % self._up_status)
1216        d.addCallback(_got_status)
1217        def _got_up(res):
1218            return self.GET("status/down-%d" % self._down_status)
1219        d.addCallback(_got_up)
1220        def _got_down(res):
1221            return self.GET("status/mapupdate-%d" % self._update_status)
1222        d.addCallback(_got_down)
1223        def _got_update(res):
1224            return self.GET("status/publish-%d" % self._publish_status)
1225        d.addCallback(_got_update)
1226        def _got_publish(res):
1227            self.failUnlessIn("Publish Results", res)
1228            return self.GET("status/retrieve-%d" % self._retrieve_status)
1229        d.addCallback(_got_publish)
1230        def _got_retrieve(res):
1231            self.failUnlessIn("Retrieve Results", res)
1232        d.addCallback(_got_retrieve)
1233
1234        # check that the helper status page exists
1235        d.addCallback(lambda res: self.GET("helper_status"))
1236        def _got_helper_status(res):
1237            self.failUnless("Bytes Fetched:" in res)
1238            # touch a couple of files in the helper's working directory to
1239            # exercise more code paths
1240            workdir = os.path.join(self.getdir("client0"), "helper")
1241            incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1242            f = open(incfile, "wb")
1243            f.write(b"small file")
1244            f.close()
1245            then = time.time() - 86400*3
1246            now = time.time()
1247            os.utime(incfile, (now, then))
1248            encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1249            f = open(encfile, "wb")
1250            f.write(b"less small file")
1251            f.close()
1252            os.utime(encfile, (now, then))
1253        d.addCallback(_got_helper_status)
1254        # and that the json form exists
1255        d.addCallback(lambda res: self.GET("helper_status?t=json"))
1256        def _got_helper_status_json(res):
1257            data = json.loads(res)
1258            self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1259                                 1)
1260            self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1261            self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1262            self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1263                                 10)
1264            self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1265            self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1266            self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1267                                 15)
1268        d.addCallback(_got_helper_status_json)
1269
1270        # and check that client[3] (which uses a helper but does not run one
1271        # itself) doesn't explode when you ask for its status
1272        d.addCallback(lambda res: do_http("get",
1273                                          self.helper_webish_url + "status/"))
1274        def _got_non_helper_status(res):
1275            self.failUnlessIn("Recent and Active Operations", res)
1276        d.addCallback(_got_non_helper_status)
1277
1278        # or for helper status with t=json
1279        d.addCallback(lambda res:
1280                      do_http("get",
1281                              self.helper_webish_url + "helper_status?t=json"))
1282        def _got_non_helper_status_json(res):
1283            data = json.loads(res)
1284            self.failUnlessEqual(data, {})
1285        d.addCallback(_got_non_helper_status_json)
1286
1287        # see if the statistics page exists
1288        d.addCallback(lambda res: self.GET("statistics"))
1289        def _got_stats(res):
1290            self.failUnlessIn("Operational Statistics", res)
1291            self.failUnlessIn('  "downloader.files_downloaded": 5,', res)
1292        d.addCallback(_got_stats)
1293        d.addCallback(lambda res: self.GET("statistics?t=json"))
1294        def _got_stats_json(res):
1295            data = json.loads(res)
1296            self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1297            self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1298        d.addCallback(_got_stats_json)
1299
1300        # TODO: mangle the second segment of a file, to test errors that
1301        # occur after we've already sent some good data, which uses a
1302        # different error path.
1303
1304        # TODO: download a URI with a form
1305        # TODO: create a directory by using a form
1306        # TODO: upload by using a form on the directory page
1307        #    url = base + "somedir/subdir1/freeform_post!!upload"
1308        # TODO: delete a file by using a button on the directory page
1309
1310        return d
1311
1312    @defer.inlineCallbacks
1313    def _test_runner(self, res):
1314        # exercise some of the diagnostic tools in runner.py
1315
1316        # find a share
1317        for (dirpath, dirnames, filenames) in os.walk(ensure_text(self.basedir)):
1318            if "storage" not in dirpath:
1319                continue
1320            if not filenames:
1321                continue
1322            pieces = dirpath.split(os.sep)
1323            if (len(pieces) >= 4
1324                and pieces[-4] == "storage"
1325                and pieces[-3] == "shares"):
1326                # we're sitting in .../storage/shares/$START/$SINDEX , and there
1327                # are sharefiles here
1328                filename = os.path.join(dirpath, filenames[0])
1329                # peek at the magic to see if it is a chk share
1330                with open(filename, "rb") as f:
1331                    if ShareFile.is_valid_header(f.read(32)):
1332                        break
1333        else:
1334            self.fail("unable to find any uri_extension files in %r"
1335                      % self.basedir)
1336        log.msg("test_system.SystemTest._test_runner using %r" % filename)
1337
1338        rc,output,err = yield run_cli("debug", "dump-share", "--offsets",
1339                                      unicode_to_argv(filename))
1340        self.failUnlessEqual(rc, 0)
1341
1342        # we only upload a single file, so we can assert some things about
1343        # its size and shares.
1344        self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1345        self.failUnlessIn("size: %d\n" % len(self.data), output)
1346        self.failUnlessIn("num_segments: 1\n", output)
1347        # segment_size is always a multiple of needed_shares
1348        self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1349        self.failUnlessIn("total_shares: 10\n", output)
1350        # keys which are supposed to be present
1351        for key in ("size", "num_segments", "segment_size",
1352                    "needed_shares", "total_shares",
1353                    "codec_name", "codec_params", "tail_codec_params",
1354                    #"plaintext_hash", "plaintext_root_hash",
1355                    "crypttext_hash", "crypttext_root_hash",
1356                    "share_root_hash", "UEB_hash"):
1357            self.failUnlessIn("%s: " % key, output)
1358        self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1359
1360        # now use its storage index to find the other shares using the
1361        # 'find-shares' tool
1362        sharedir, shnum = os.path.split(filename)
1363        storagedir, storage_index_s = os.path.split(sharedir)
1364        nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1365        rc,out,err = yield run_cli("debug", "find-shares", storage_index_s,
1366                                   *nodedirs)
1367        self.failUnlessEqual(rc, 0)
1368        sharefiles = [sfn.strip() for sfn in out.splitlines()]
1369        self.failUnlessEqual(len(sharefiles), 10)
1370
1371        # also exercise the 'catalog-shares' tool
1372        nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1373        rc,out,err = yield run_cli("debug", "catalog-shares", *nodedirs)
1374        self.failUnlessEqual(rc, 0)
1375        descriptions = [sfn.strip() for sfn in out.splitlines()]
1376        self.failUnlessEqual(len(descriptions), 30)
1377        matching = [line
1378                    for line in descriptions
1379                    if line.startswith("CHK %s " % storage_index_s)]
1380        self.failUnlessEqual(len(matching), 10)
1381
1382    def _test_cli(self, res):
1383        # run various CLI commands (in a thread, since they use blocking
1384        # network calls)
1385
1386        private_uri = self._private_node.get_uri()
1387        client0_basedir = self.getdir("client0")
1388
1389        nodeargs = [
1390            "--node-directory", client0_basedir,
1391            ]
1392
1393        d = defer.succeed(None)
1394
1395        # for compatibility with earlier versions, private/root_dir.cap is
1396        # supposed to be treated as an alias named "tahoe:". Start by making
1397        # sure that works, before we add other aliases.
1398
1399        root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1400        f = open(root_file, "wb")
1401        f.write(private_uri)
1402        f.close()
1403
1404        @defer.inlineCallbacks
1405        def run(ignored, verb, *args, **kwargs):
1406            rc,out,err = yield run_cli(verb, *args, nodeargs=nodeargs, **kwargs)
1407            defer.returnValue((out,err))
1408
1409        def _check_ls(out_and_err, expected_children, unexpected_children=()):
1410            (out, err) = out_and_err
1411            self.failUnlessEqual(err, "")
1412            for s in expected_children:
1413                self.failUnless(s in out, (s,out))
1414            for s in unexpected_children:
1415                self.failIf(s in out, (s,out))
1416
1417        def _check_ls_root(out_and_err):
1418            (out, err) = out_and_err
1419            self.failUnless("personal" in out)
1420            self.failUnless("s2-ro" in out)
1421            self.failUnless("s2-rw" in out)
1422            self.failUnlessEqual(err, "")
1423
1424        # this should reference private_uri
1425        d.addCallback(run, "ls")
1426        d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1427
1428        d.addCallback(run, "list-aliases")
1429        def _check_aliases_1(out_and_err):
1430            (out, err) = out_and_err
1431            self.failUnlessEqual(err, "")
1432            self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % str(private_uri, "ascii"))
1433        d.addCallback(_check_aliases_1)
1434
1435        # now that that's out of the way, remove root_dir.cap and work with
1436        # new files
1437        d.addCallback(lambda res: os.unlink(root_file))
1438        d.addCallback(run, "list-aliases")
1439        def _check_aliases_2(out_and_err):
1440            (out, err) = out_and_err
1441            self.failUnlessEqual(err, "")
1442            self.failUnlessEqual(out, "")
1443        d.addCallback(_check_aliases_2)
1444
1445        d.addCallback(run, "mkdir")
1446        def _got_dir(out_and_err ):
1447            (out, err) = out_and_err
1448            self.failUnless(uri.from_string_dirnode(out.strip()))
1449            return out.strip()
1450        d.addCallback(_got_dir)
1451        d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1452
1453        d.addCallback(run, "list-aliases")
1454        def _check_aliases_3(out_and_err):
1455            (out, err) = out_and_err
1456            self.failUnlessEqual(err, "")
1457            self.failUnless("tahoe: " in out)
1458        d.addCallback(_check_aliases_3)
1459
1460        def _check_empty_dir(out_and_err):
1461            (out, err) = out_and_err
1462            self.failUnlessEqual(out, "")
1463            self.failUnlessEqual(err, "")
1464        d.addCallback(run, "ls")
1465        d.addCallback(_check_empty_dir)
1466
1467        def _check_missing_dir(out_and_err):
1468            # TODO: check that rc==2
1469            (out, err) = out_and_err
1470            self.failUnlessEqual(out, "")
1471            self.failUnlessEqual(err, "No such file or directory\n")
1472        d.addCallback(run, "ls", "bogus")
1473        d.addCallback(_check_missing_dir)
1474
1475        files = []
1476        datas = []
1477        for i in range(10):
1478            fn = os.path.join(self.basedir, "file%d" % i)
1479            files.append(fn)
1480            data = b"data to be uploaded: file%d\n" % i
1481            datas.append(data)
1482            with open(fn, "wb") as f:
1483                f.write(data)
1484
1485        def _check_stdout_against(out_and_err, filenum=None, data=None):
1486            (out, err) = out_and_err
1487            self.failUnlessEqual(err, "")
1488            if filenum is not None:
1489                self.failUnlessEqual(out, str(datas[filenum], "ascii"))
1490            if data is not None:
1491                self.failUnlessEqual(out, data)
1492
1493        # test all both forms of put: from a file, and from stdin
1494        #  tahoe put bar FOO
1495        d.addCallback(run, "put", files[0], "tahoe-file0")
1496        def _put_out(out_and_err):
1497            (out, err) = out_and_err
1498            self.failUnless("URI:LIT:" in out, out)
1499            self.failUnless("201 Created" in err, err)
1500            uri0 = out.strip()
1501            return run(None, "get", uri0)
1502        d.addCallback(_put_out)
1503        d.addCallback(lambda out_err: self.failUnlessEqual(out_err[0], str(datas[0], "ascii")))
1504
1505        d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1506        #  tahoe put bar tahoe:FOO
1507        d.addCallback(run, "put", files[2], "tahoe:file2")
1508        d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1509        def _check_put_mutable(out_and_err):
1510            (out, err) = out_and_err
1511            self._mutable_file3_uri = out.strip()
1512        d.addCallback(_check_put_mutable)
1513        d.addCallback(run, "get", "tahoe:file3")
1514        d.addCallback(_check_stdout_against, 3)
1515
1516        #  tahoe put FOO
1517        STDIN_DATA = "This is the file to upload from stdin."
1518        d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1519        #  tahoe put tahoe:FOO
1520        d.addCallback(run, "put", "-", "tahoe:from-stdin",
1521                      stdin="Other file from stdin.")
1522
1523        d.addCallback(run, "ls")
1524        d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1525                                  "tahoe-file-stdin", "from-stdin"])
1526        d.addCallback(run, "ls", "subdir")
1527        d.addCallback(_check_ls, ["tahoe-file1"])
1528
1529        # tahoe mkdir FOO
1530        d.addCallback(run, "mkdir", "subdir2")
1531        d.addCallback(run, "ls")
1532        # TODO: extract the URI, set an alias with it
1533        d.addCallback(_check_ls, ["subdir2"])
1534
1535        # tahoe get: (to stdin and to a file)
1536        d.addCallback(run, "get", "tahoe-file0")
1537        d.addCallback(_check_stdout_against, 0)
1538        d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1539        d.addCallback(_check_stdout_against, 1)
1540        outfile0 = os.path.join(self.basedir, "outfile0")
1541        d.addCallback(run, "get", "file2", outfile0)
1542        def _check_outfile0(out_and_err):
1543            (out, err) = out_and_err
1544            data = open(outfile0,"rb").read()
1545            self.failUnlessEqual(data, b"data to be uploaded: file2\n")
1546        d.addCallback(_check_outfile0)
1547        outfile1 = os.path.join(self.basedir, "outfile0")
1548        d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1549        def _check_outfile1(out_and_err):
1550            (out, err) = out_and_err
1551            data = open(outfile1,"rb").read()
1552            self.failUnlessEqual(data, b"data to be uploaded: file1\n")
1553        d.addCallback(_check_outfile1)
1554
1555        d.addCallback(run, "unlink", "tahoe-file0")
1556        d.addCallback(run, "unlink", "tahoe:file2")
1557        d.addCallback(run, "ls")
1558        d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1559
1560        d.addCallback(run, "ls", "-l")
1561        def _check_ls_l(out_and_err):
1562            (out, err) = out_and_err
1563            lines = out.split("\n")
1564            for l in lines:
1565                if "tahoe-file-stdin" in l:
1566                    self.failUnless(l.startswith("-r-- "), l)
1567                    self.failUnless(" %d " % len(STDIN_DATA) in l)
1568                if "file3" in l:
1569                    self.failUnless(l.startswith("-rw- "), l) # mutable
1570        d.addCallback(_check_ls_l)
1571
1572        d.addCallback(run, "ls", "--uri")
1573        def _check_ls_uri(out_and_err):
1574            (out, err) = out_and_err
1575            lines = out.split("\n")
1576            for l in lines:
1577                if "file3" in l:
1578                    self.failUnless(self._mutable_file3_uri in l)
1579        d.addCallback(_check_ls_uri)
1580
1581        d.addCallback(run, "ls", "--readonly-uri")
1582        def _check_ls_rouri(out_and_err):
1583            (out, err) = out_and_err
1584            lines = out.split("\n")
1585            for l in lines:
1586                if "file3" in l:
1587                    rw_uri = self._mutable_file3_uri
1588                    u = uri.from_string_mutable_filenode(rw_uri)
1589                    ro_uri = str(u.get_readonly().to_string(), "ascii")
1590                    self.failUnless(ro_uri in l)
1591        d.addCallback(_check_ls_rouri)
1592
1593
1594        d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1595        d.addCallback(run, "ls")
1596        d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1597
1598        d.addCallback(run, "ln", "tahoe-moved", "newlink")
1599        d.addCallback(run, "ls")
1600        d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1601
1602        d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1603        d.addCallback(run, "ls")
1604        d.addCallback(_check_ls, ["file3", "file3-copy"])
1605        d.addCallback(run, "get", "tahoe:file3-copy")
1606        d.addCallback(_check_stdout_against, 3)
1607
1608        # copy from disk into tahoe
1609        d.addCallback(run, "cp", files[4], "tahoe:file4")
1610        d.addCallback(run, "ls")
1611        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1612        d.addCallback(run, "get", "tahoe:file4")
1613        d.addCallback(_check_stdout_against, 4)
1614
1615        # copy from tahoe into disk
1616        target_filename = os.path.join(self.basedir, "file-out")
1617        d.addCallback(run, "cp", "tahoe:file4", target_filename)
1618        def _check_cp_out(out_and_err):
1619            (out, err) = out_and_err
1620            self.failUnless(os.path.exists(target_filename))
1621            got = open(target_filename,"rb").read()
1622            self.failUnlessEqual(got, datas[4])
1623        d.addCallback(_check_cp_out)
1624
1625        # copy from disk to disk (silly case)
1626        target2_filename = os.path.join(self.basedir, "file-out-copy")
1627        d.addCallback(run, "cp", target_filename, target2_filename)
1628        def _check_cp_out2(out_and_err):
1629            (out, err) = out_and_err
1630            self.failUnless(os.path.exists(target2_filename))
1631            got = open(target2_filename,"rb").read()
1632            self.failUnlessEqual(got, datas[4])
1633        d.addCallback(_check_cp_out2)
1634
1635        # copy from tahoe into disk, overwriting an existing file
1636        d.addCallback(run, "cp", "tahoe:file3", target_filename)
1637        def _check_cp_out3(out_and_err):
1638            (out, err) = out_and_err
1639            self.failUnless(os.path.exists(target_filename))
1640            got = open(target_filename,"rb").read()
1641            self.failUnlessEqual(got, datas[3])
1642        d.addCallback(_check_cp_out3)
1643
1644        # copy from disk into tahoe, overwriting an existing immutable file
1645        d.addCallback(run, "cp", files[5], "tahoe:file4")
1646        d.addCallback(run, "ls")
1647        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1648        d.addCallback(run, "get", "tahoe:file4")
1649        d.addCallback(_check_stdout_against, 5)
1650
1651        # copy from disk into tahoe, overwriting an existing mutable file
1652        d.addCallback(run, "cp", files[5], "tahoe:file3")
1653        d.addCallback(run, "ls")
1654        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1655        d.addCallback(run, "get", "tahoe:file3")
1656        d.addCallback(_check_stdout_against, 5)
1657
1658        # recursive copy: setup
1659        dn = os.path.join(self.basedir, "dir1")
1660        os.makedirs(dn)
1661        with open(os.path.join(dn, "rfile1"), "wb") as f:
1662            f.write(b"rfile1")
1663        with open(os.path.join(dn, "rfile2"), "wb") as f:
1664            f.write(b"rfile2")
1665        with open(os.path.join(dn, "rfile3"), "wb") as f:
1666            f.write(b"rfile3")
1667        sdn2 = os.path.join(dn, "subdir2")
1668        os.makedirs(sdn2)
1669        with open(os.path.join(sdn2, "rfile4"), "wb") as f:
1670            f.write(b"rfile4")
1671        with open(os.path.join(sdn2, "rfile5"), "wb") as f:
1672            f.write(b"rfile5")
1673
1674        # from disk into tahoe
1675        d.addCallback(run, "cp", "-r", dn, "tahoe:")
1676        d.addCallback(run, "ls")
1677        d.addCallback(_check_ls, ["dir1"])
1678        d.addCallback(run, "ls", "dir1")
1679        d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680                      ["rfile4", "rfile5"])
1681        d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1682        d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683                      ["rfile1", "rfile2", "rfile3"])
1684        d.addCallback(run, "get", "dir1/subdir2/rfile4")
1685        d.addCallback(_check_stdout_against, data="rfile4")
1686
1687        # and back out again
1688        dn_copy = os.path.join(self.basedir, "dir1-copy")
1689        d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1690        def _check_cp_r_out(out_and_err):
1691            (out, err) = out_and_err
1692            def _cmp(name):
1693                old = open(os.path.join(dn, name), "rb").read()
1694                newfn = os.path.join(dn_copy, "dir1", name)
1695                self.failUnless(os.path.exists(newfn))
1696                new = open(newfn, "rb").read()
1697                self.failUnlessEqual(old, new)
1698            _cmp("rfile1")
1699            _cmp("rfile2")
1700            _cmp("rfile3")
1701            _cmp(os.path.join("subdir2", "rfile4"))
1702            _cmp(os.path.join("subdir2", "rfile5"))
1703        d.addCallback(_check_cp_r_out)
1704
1705        # and copy it a second time, which ought to overwrite the same files
1706        d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1707
1708        # and again, only writing filecaps
1709        dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1710        d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1711        def _check_capsonly(out_and_err):
1712            # these should all be LITs
1713            (out, err) = out_and_err
1714            x = open(os.path.join(dn_copy2, "dir1", "subdir2", "rfile4")).read()
1715            y = uri.from_string_filenode(x)
1716            self.failUnlessEqual(y.data, b"rfile4")
1717        d.addCallback(_check_capsonly)
1718
1719        # and tahoe-to-tahoe
1720        d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1721        d.addCallback(run, "ls")
1722        d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1723        d.addCallback(run, "ls", "dir1-copy/dir1")
1724        d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1725                      ["rfile4", "rfile5"])
1726        d.addCallback(run, "ls", "tahoe:dir1-copy/dir1/subdir2")
1727        d.addCallback(_check_ls, ["rfile4", "rfile5"],
1728                      ["rfile1", "rfile2", "rfile3"])
1729        d.addCallback(run, "get", "dir1-copy/dir1/subdir2/rfile4")
1730        d.addCallback(_check_stdout_against, data="rfile4")
1731
1732        # and copy it a second time, which ought to overwrite the same files
1733        d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1734
1735        # tahoe_ls doesn't currently handle the error correctly: it tries to
1736        # JSON-parse a traceback.
1737##         def _ls_missing(res):
1738##             argv = nodeargs + ["ls", "bogus"]
1739##             return self._run_cli(argv)
1740##         d.addCallback(_ls_missing)
1741##         def _check_ls_missing((out,err)):
1742##             print("OUT", out)
1743##             print("ERR", err)
1744##             self.failUnlessEqual(err, "")
1745##         d.addCallback(_check_ls_missing)
1746
1747        return d
1748
1749    # In CI this test can be very slow, so give it a longer timeout:
1750    test_filesystem.timeout = 360  # type: ignore[attr-defined]
1751
1752
1753    def test_filesystem_with_cli_in_subprocess(self):
1754        # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1755
1756        d = self.set_up_nodes()
1757        def _new_happy_semantics(ign):
1758            for c in self.clients:
1759                c.encoding_params['happy'] = 1
1760        d.addCallback(_new_happy_semantics)
1761
1762        def _run_in_subprocess(ignored, verb, *args, **kwargs):
1763            stdin = kwargs.get("stdin")
1764            # XXX https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3548
1765            env = kwargs.get("env", os.environ)
1766            # Python warnings from the child process don't matter.
1767            env["PYTHONWARNINGS"] = "ignore"
1768            newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1769            return self.run_bintahoe(newargs, stdin=stdin, env=env)
1770
1771        def _check_succeeded(res):
1772            out, err, rc_or_sig = res
1773            self.failUnlessEqual(rc_or_sig, 0, str(res))
1774
1775        d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1776        d.addCallback(_check_succeeded)
1777
1778        STDIN_DATA = b"This is the file to upload from stdin."
1779        d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1780        d.addCallback(_check_succeeded)
1781
1782        def _mv_with_http_proxy(ign):
1783            env = os.environ
1784            env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1785            return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1786        d.addCallback(_mv_with_http_proxy)
1787        d.addCallback(_check_succeeded)
1788
1789        d.addCallback(_run_in_subprocess, "ls", "newalias:")
1790        def _check_ls(res):
1791            out, err, rc_or_sig = res
1792            self.failUnlessEqual(rc_or_sig, 0, str(res))
1793            self.failUnlessIn(b"tahoe-moved", out)
1794            self.failIfIn(b"tahoe-file", out)
1795        d.addCallback(_check_ls)
1796        return d
1797
1798    def _test_checker(self, res):
1799        ut = upload.Data(b"too big to be literal" * 200, convergence=None)
1800        d = self._personal_node.add_file(u"big file", ut)
1801
1802        d.addCallback(lambda res: self._personal_node.check(Monitor()))
1803        def _check_dirnode_results(r):
1804            self.failUnless(r.is_healthy())
1805        d.addCallback(_check_dirnode_results)
1806        d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1807        d.addCallback(_check_dirnode_results)
1808
1809        d.addCallback(lambda res: self._personal_node.get(u"big file"))
1810        def _got_chk_filenode(n):
1811            self.failUnless(isinstance(n, ImmutableFileNode))
1812            d = n.check(Monitor())
1813            def _check_filenode_results(r):
1814                self.failUnless(r.is_healthy())
1815            d.addCallback(_check_filenode_results)
1816            d.addCallback(lambda res: n.check(Monitor(), verify=True))
1817            d.addCallback(_check_filenode_results)
1818            return d
1819        d.addCallback(_got_chk_filenode)
1820
1821        d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1822        def _got_lit_filenode(n):
1823            self.failUnless(isinstance(n, LiteralFileNode))
1824            d = n.check(Monitor())
1825            def _check_lit_filenode_results(r):
1826                self.failUnlessEqual(r, None)
1827            d.addCallback(_check_lit_filenode_results)
1828            d.addCallback(lambda res: n.check(Monitor(), verify=True))
1829            d.addCallback(_check_lit_filenode_results)
1830            return d
1831        d.addCallback(_got_lit_filenode)
1832        return d
1833
1834
1835class Connections(SystemTestMixin, unittest.TestCase):
1836    FORCE_FOOLSCAP_FOR_STORAGE = True
1837
1838    def test_rref(self):
1839        # The way the listening port is created is via
1840        # SameProcessStreamEndpointAssigner (allmydata.test.common), which then
1841        # makes an endpoint string parsed by AdoptedServerPort. The latter does
1842        # dup(fd), which results in the filedescriptor staying alive _until the
1843        # test ends_. That means that when we disown the service, we still have
1844        # the listening port there on the OS level! Just the resulting
1845        # connections aren't handled. So this test relies on aggressive
1846        # timeouts in the HTTP client and presumably some equivalent in
1847        # Foolscap, since connection refused does _not_ happen.
1848        self.basedir = "system/Connections/rref-foolscap-{}".format(
1849            self.FORCE_FOOLSCAP_FOR_STORAGE
1850        )
1851        d = self.set_up_nodes(2)
1852        def _start(ign):
1853            self.c0 = self.clients[0]
1854            nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1855                          if s.get_serverid() != self.c0.get_long_nodeid()]
1856            self.failUnlessEqual(len(nonclients), 1)
1857
1858            self.s1 = nonclients[0]  # s1 is the server, not c0
1859            self.s1_storage_server = self.s1.get_storage_server()
1860            self.assertIsNot(self.s1_storage_server, None)
1861            self.assertTrue(self.s1.is_connected())
1862        d.addCallback(_start)
1863
1864        # now shut down the server
1865        d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1866
1867        # kill any persistent http connections that might continue to work
1868        d.addCallback(lambda ign: self.close_idle_http_connections())
1869
1870        # and wait for the client to notice
1871        def _poll():
1872            return len(self.c0.storage_broker.get_connected_servers()) == 1
1873        d.addCallback(lambda ign: self.poll(_poll))
1874
1875        def _down(ign):
1876            self.assertFalse(self.s1.is_connected())
1877            storage_server = self.s1.get_storage_server()
1878            self.assertIsNot(storage_server, None)
1879            self.assertEqual(storage_server, self.s1_storage_server)
1880        d.addCallback(_down)
1881        return d
1882
1883
1884class HTTPSystemTest(SystemTest):
1885    """HTTP storage protocol variant of the system tests."""
1886
1887    FORCE_FOOLSCAP_FOR_STORAGE = False
1888
1889
1890
1891class HTTPConnections(Connections):
1892    """HTTP storage protocol variant of the connections tests."""
1893    FORCE_FOOLSCAP_FOR_STORAGE = False
1894
Note: See TracBrowser for help on using the repository browser.