source: trunk/src/allmydata/test/web/test_root.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 6.9 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import time
6import json
7
8from urllib.parse import (
9    quote,
10)
11
12from bs4 import (
13    BeautifulSoup,
14)
15
16from twisted.web.template import Tag
17from twisted.web.test.requesthelper import DummyRequest
18from twisted.application import service
19from testtools.twistedsupport import succeeded
20from twisted.internet.defer import (
21    inlineCallbacks,
22    succeed,
23)
24
25from ...storage_client import (
26    NativeStorageServer,
27    StorageFarmBroker,
28)
29from ...web.root import (
30    RootElement,
31    Root,
32)
33from ...util.connection_status import ConnectionStatus
34from ...crypto.ed25519 import (
35    create_signing_keypair,
36)
37from allmydata.web.root import URIHandler
38from allmydata.client import _Client
39
40from .common import (
41    assert_soup_has_tag_with_attributes,
42)
43from ..common_web import (
44    render,
45)
46from ..common import (
47    EMPTY_CLIENT_CONFIG,
48)
49
50from ..common import (
51    SyncTestCase,
52    AsyncTestCase,
53)
54
55from testtools.matchers import (
56    Equals,
57    Contains,
58    AfterPreprocessing,
59)
60
61class RenderSlashUri(SyncTestCase):
62    """
63    Ensure that URIs starting with /uri?uri= only accept valid
64    capabilities
65    """
66
67    def setUp(self):
68        self.client = object()
69        self.res = URIHandler(self.client)
70        super(RenderSlashUri, self).setUp()
71
72    @inlineCallbacks
73    def test_valid_query_redirect(self):
74        """
75        A syntactically valid capability given in the ``uri`` query argument
76        results in a redirect.
77        """
78        cap = (
79            b"URI:CHK:nt2xxmrccp7sursd6yh2thhcky:"
80            b"mukesarwdjxiyqsjinbfiiro6q7kgmmekocxfjcngh23oxwyxtzq:2:5:5874882"
81        )
82        query_args = {b"uri": [cap]}
83        response_body = yield render(self.res, query_args)
84        soup = BeautifulSoup(response_body, 'html5lib')
85        tag = assert_soup_has_tag_with_attributes(
86            self,
87            soup,
88            u"meta",
89            {u"http-equiv": "refresh"},
90        )
91        self.assertThat(
92            tag.attrs.get(u"content"),
93            Contains(quote(cap, safe="")),
94        )
95
96    def test_invalid(self):
97        """
98        A syntactically invalid capbility results in an error.
99        """
100        query_args = {b"uri": [b"not a capability"]}
101        response_body = render(self.res, query_args)
102        self.assertThat(
103            response_body,
104            succeeded(AfterPreprocessing(bytes, Equals(b"Invalid capability"))),
105        )
106
107
108class RenderServiceRow(SyncTestCase):
109    def test_missing(self):
110        """
111        minimally-defined static servers just need anonymous-storage-FURL
112        and permutation-seed-base32. The WUI used to have problems
113        rendering servers that lacked nickname and version. This tests that
114        we can render such minimal servers.
115        """
116        ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
117               "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
118               }
119        srv = NativeStorageServer(b"server_id", ann, None, {}, EMPTY_CLIENT_CONFIG)
120        srv.get_connection_status = lambda: ConnectionStatus(False, "summary", {}, 0, 0)
121
122        class FakeClient(_Client):
123            def __init__(self):
124                service.MultiService.__init__(self)
125                self.storage_broker = StorageFarmBroker(
126                    permute_peers=True,
127                    tub_maker=None,
128                    node_config=EMPTY_CLIENT_CONFIG,
129                )
130                self.storage_broker.test_add_server(b"test-srv", srv)
131
132        root = RootElement(FakeClient(), time.time)
133        req = DummyRequest(b"")
134        tag = Tag(b"")
135
136        # Pick all items from services table.
137        items = root.services_table(req, tag).item(req, tag)
138
139        # Coerce `items` to list and pick the first item from it.
140        item = list(items)[0]
141
142        self.assertThat(item.slotData.get("version"), Equals(""))
143        self.assertThat(item.slotData.get("nickname"), Equals(""))
144
145
146class RenderRoot(AsyncTestCase):
147
148    @inlineCallbacks
149    def test_root_json(self):
150        """
151        The 'welcome' / root page renders properly with ?t=json when some
152        servers show None for available_space while others show a
153        valid int
154
155        See also https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3852
156        """
157        ann = {
158            "anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
159            "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
160        }
161        srv0 = NativeStorageServer(b"server_id0", ann, None, {}, EMPTY_CLIENT_CONFIG)
162        srv0.get_connection_status = lambda: ConnectionStatus(False, "summary0", {}, 0, 0)
163
164        srv1 = NativeStorageServer(b"server_id1", ann, None, {}, EMPTY_CLIENT_CONFIG)
165        srv1.get_connection_status = lambda: ConnectionStatus(False, "summary1", {}, 0, 0)
166        # arrange for this server to have some valid available space
167        srv1.get_available_space = lambda: 12345
168
169        class FakeClient(_Client):
170            history = []
171            stats_provider = object()
172            nickname = ""
173            nodeid = b"asdf"
174            _node_public_key = create_signing_keypair()[1]
175            introducer_clients = []
176            helper = None
177
178            def __init__(self):
179                service.MultiService.__init__(self)
180                self.storage_broker = StorageFarmBroker(
181                    permute_peers=True,
182                    tub_maker=None,
183                    node_config=EMPTY_CLIENT_CONFIG,
184                )
185                self.storage_broker.test_add_server(b"test-srv0", srv0)
186                self.storage_broker.test_add_server(b"test-srv1", srv1)
187
188        root = Root(FakeClient(), now_fn=time.time)
189
190        lines = []
191
192        req = DummyRequest(b"")
193        req.fields = {}
194        req.args = {
195            b"t": [b"json"],
196        }
197
198        # for some reason, DummyRequest is already finished when we
199        # try to add a notifyFinish handler, so override that
200        # behavior.
201
202        def nop():
203            return succeed(None)
204        req.notifyFinish = nop
205        req.write = lines.append
206
207        yield root.render(req)
208
209        raw_js = b"".join(lines).decode("utf8")
210        js = json.loads(raw_js)
211        servers = js["servers"]
212        self.assertEquals(len(servers), 2)
213        self.assertIn(
214            {
215                "connection_status": "summary0",
216                "nodeid": "server_id0",
217                "last_received_data": 0,
218                "version": None,
219                "available_space": None,
220                "nickname": ""
221            },
222            servers
223        )
224        self.assertIn(
225            {
226                "connection_status": "summary1",
227                "nodeid": "server_id1",
228                "last_received_data": 0,
229                "version": None,
230                "available_space": 12345,
231                "nickname": ""
232            },
233            servers
234        )
Note: See TracBrowser for help on using the repository browser.