source: trunk/src/allmydata/test/test_storage_client.py

Last change on this file was 3a1e079, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-12-11T15:14:31Z

Pacify newer Mypy

  • Property mode set to 100644
File size: 29.7 KB
Line 
1"""
2Tests for allmydata.storage_client.
3"""
4
5from __future__ import annotations
6
7from json import (
8    loads,
9)
10import hashlib
11from typing import Union, Any, Optional
12
13from hyperlink import DecodedURL
14from fixtures import (
15    TempDir,
16)
17from testtools.content import (
18    text_content,
19)
20from testtools.matchers import (
21    MatchesAll,
22    IsInstance,
23    MatchesStructure,
24    Equals,
25    Is,
26    AfterPreprocessing,
27)
28
29from zope.interface import (
30    implementer,
31)
32from zope.interface.verify import (
33    verifyObject,
34)
35
36from hyperlink import (
37    URL,
38)
39
40import attr
41
42from twisted.internet.interfaces import (
43    IStreamClientEndpoint,
44    IProtocolFactory,
45)
46from twisted.application.service import (
47    Service,
48)
49
50from twisted.trial import unittest
51from twisted.internet.defer import (
52    Deferred,
53    inlineCallbacks,
54)
55from twisted.python.filepath import (
56    FilePath,
57)
58from twisted.internet.task import Clock
59
60from foolscap.api import (
61    Tub,
62)
63from foolscap.ipb import (
64    IConnectionHintHandler,
65)
66
67from allmydata.util.deferredutil import MultiFailure
68
69from .no_network import LocalWrapper
70from .common import (
71    EMPTY_CLIENT_CONFIG,
72    SyncTestCase,
73    AsyncTestCase,
74    UseTestPlugins,
75    UseNode,
76    SameProcessStreamEndpointAssigner,
77    MemoryIntroducerClient,
78)
79from .common_web import (
80    do_http,
81)
82from .storage_plugin import (
83    DummyStorageClient,
84)
85from allmydata.webish import (
86    WebishServer,
87)
88from allmydata.util import base32, yamlutil
89from allmydata.storage_client import (
90    IFoolscapStorageServer,
91    NativeStorageServer,
92    HTTPNativeStorageServer,
93    StorageFarmBroker,
94    StorageClientConfig,
95    MissingPlugin,
96    _FoolscapStorage,
97    _NullStorage,
98    _pick_a_http_server,
99    ANONYMOUS_STORAGE_NURLS,
100)
101from ..storage.server import (
102    StorageServer,
103)
104from ..client import config_from_string
105
106from allmydata.interfaces import (
107    IConnectionStatus,
108    IStorageServer,
109)
110
111SOME_FURL = "pb://abcde@nowhere/fake"
112
113
114class NativeStorageServerWithVersion(NativeStorageServer):  # type: ignore  # tahoe-lafs/ticket/3573
115    def __init__(self, version):
116        # note: these instances won't work for anything other than
117        # get_available_space() because we don't upcall
118        self.version = version
119    def get_version(self):
120        return self.version
121
122
123class TestNativeStorageServer(unittest.TestCase):
124    def test_get_available_space_new(self):
125        nss = NativeStorageServerWithVersion(
126            { b"http://allmydata.org/tahoe/protocols/storage/v1":
127                { b"maximum-immutable-share-size": 111,
128                  b"available-space": 222,
129                }
130            })
131        self.failUnlessEqual(nss.get_available_space(), 222)
132
133    def test_get_available_space_old(self):
134        nss = NativeStorageServerWithVersion(
135            { b"http://allmydata.org/tahoe/protocols/storage/v1":
136                { b"maximum-immutable-share-size": 111,
137                }
138            })
139        self.failUnlessEqual(nss.get_available_space(), 111)
140
141    def test_missing_nickname(self):
142        ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
143               "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
144               }
145        nss = NativeStorageServer(b"server_id", ann, None, {}, EMPTY_CLIENT_CONFIG)
146        self.assertEqual(nss.get_nickname(), "")
147
148
149class GetConnectionStatus(unittest.TestCase):
150    """
151    Tests for ``NativeStorageServer.get_connection_status``.
152    """
153    def test_unrecognized_announcement(self):
154        """
155        When ``NativeStorageServer`` is constructed with a storage announcement it
156        doesn't recognize, its ``get_connection_status`` nevertheless returns
157        an object which provides ``IConnectionStatus``.
158        """
159        # Pretty hard to recognize anything from an empty announcement.
160        ann = {}
161        nss = NativeStorageServer(b"server_id", ann, Tub, {}, EMPTY_CLIENT_CONFIG)
162        nss.start_connecting(lambda: None)
163        connection_status = nss.get_connection_status()
164        self.assertTrue(IConnectionStatus.providedBy(connection_status))
165
166
167class UnrecognizedAnnouncement(unittest.TestCase):
168    """
169    Tests for handling of announcements that aren't recognized and don't use
170    *anonymous-storage-FURL*.
171
172    Recognition failure is created by making up something completely novel for
173    these tests.  In real use, recognition failure would most likely come from
174    an announcement generated by a storage server plugin which is not loaded
175    in the client.
176    """
177    plugin_name = u"tahoe-lafs-testing-v1"
178    ann = {
179        u"storage-options": [
180            {
181                u"name": plugin_name,
182                u"any-parameter": 12345,
183            },
184        ],
185    }
186    server_id = b"abc"
187
188    def _tub_maker(self, overrides):
189        return Service()
190
191    def native_storage_server(self, config: Optional[StorageClientConfig] = None) -> NativeStorageServer:
192        """
193        Make a ``NativeStorageServer`` out of an unrecognizable announcement.
194        """
195        return NativeStorageServer(
196            self.server_id,
197            self.ann,
198            self._tub_maker,
199            {},
200            node_config=EMPTY_CLIENT_CONFIG,
201            config=config if config is not None else StorageClientConfig(),
202        )
203
204    def test_no_exceptions(self):
205        """
206        ``NativeStorageServer`` can be instantiated with an unrecognized
207        announcement.
208        """
209        self.native_storage_server()
210
211    def test_start_connecting(self):
212        """
213        ``NativeStorageServer.start_connecting`` does not raise an exception.
214        """
215        server = self.native_storage_server()
216        server.start_connecting(None)
217
218    def test_stop_connecting(self):
219        """
220        ``NativeStorageServer.stop_connecting`` does not raise an exception.
221        """
222        server = self.native_storage_server()
223        server.start_connecting(None)
224        server.stop_connecting()
225
226    def test_try_to_connect(self):
227        """
228        ``NativeStorageServer.try_to_connect`` does not raise an exception.
229        """
230        server = self.native_storage_server()
231        server.start_connecting(None)
232        server.try_to_connect()
233
234    def test_various_data_methods(self):
235        """
236        The data accessors of ``NativeStorageServer`` that depend on the
237        announcement do not raise an exception.
238        """
239        server = self.native_storage_server()
240        server.get_permutation_seed()
241        server.get_name()
242        server.get_longname()
243        server.get_tubid()
244        server.get_lease_seed()
245        server.get_foolscap_write_enabler_seed()
246        server.get_nickname()
247
248    def test_missing_plugin(self) -> None:
249        """
250        An exception is produced if the plugin is missing
251        """
252        with self.assertRaises(MissingPlugin):
253            self.native_storage_server(
254                StorageClientConfig(
255                    storage_plugins={
256                        "missing-plugin-name": {}
257                    }
258                )
259            )
260
261
262class PluginMatchedAnnouncement(SyncTestCase):
263    """
264    Tests for handling by ``NativeStorageServer`` of storage server
265    announcements that are handled by an ``IFoolscapStoragePlugin``.
266    """
267    @inlineCallbacks
268    def make_node(self, introducer_furl, storage_plugin, plugin_config):
269        """
270        Create a client node with the given configuration.
271
272        :param bytes introducer_furl: The introducer furl with which to
273            configure the client.
274
275        :param bytes storage_plugin: The name of a storage plugin to enable.
276
277        :param dict[bytes, bytes] plugin_config: Configuration to supply to
278            the enabled plugin.  May also be ``None`` for no configuration
279            section (distinct from ``{}`` which creates an empty configuration
280            section).
281        """
282        tempdir = TempDir()
283        self.useFixture(tempdir)
284        self.basedir = FilePath(tempdir.path)
285        self.basedir.child(u"private").makedirs()
286        self.useFixture(UseTestPlugins())
287
288        self.node_fixture = self.useFixture(UseNode(
289            plugin_config,
290            storage_plugin,
291            self.basedir,
292            introducer_furl,
293        ))
294        self.config = self.node_fixture.config
295        self.node = yield self.node_fixture.create_node()
296        [self.introducer_client] = self.node.introducer_clients
297
298
299    def publish(self, server_id, announcement, introducer_client):
300        for subscription in introducer_client.subscribed_to:
301            if subscription.service_name == u"storage":
302                subscription.cb(
303                    server_id,
304                    announcement,
305                    *subscription.args,
306                    **subscription.kwargs
307                )
308
309    def get_storage(self, server_id, node):
310        storage_broker = node.get_storage_broker()
311        native_storage_server = storage_broker.servers[server_id]
312        return native_storage_server._storage
313
314    def set_rref(self, server_id, node, rref):
315        storage_broker = node.get_storage_broker()
316        native_storage_server = storage_broker.servers[server_id]
317        native_storage_server._rref = rref
318
319    @inlineCallbacks
320    def test_ignored_non_enabled_plugin(self):
321        """
322        An announcement that could be matched by a plugin that is not enabled is
323        not matched.
324        """
325        yield self.make_node(
326            introducer_furl=SOME_FURL,
327            storage_plugin="tahoe-lafs-dummy-v1",
328            plugin_config=None,
329        )
330        server_id = b"v0-abcdef"
331        ann = {
332            u"service-name": u"storage",
333            u"storage-options": [{
334                # notice how the announcement is for a different storage plugin
335                # than the one that is enabled.
336                u"name": u"tahoe-lafs-dummy-v2",
337                u"storage-server-FURL": SOME_FURL,
338            }],
339        }
340        self.publish(server_id, ann, self.introducer_client)
341        storage = self.get_storage(server_id, self.node)
342        self.assertIsInstance(storage, _NullStorage)
343
344    @inlineCallbacks
345    def test_enabled_plugin(self):
346        """
347        An announcement that could be matched by a plugin that is enabled with
348        configuration is matched and the plugin's storage client is used.
349        """
350        plugin_config = {
351            "abc": "xyz",
352        }
353        plugin_name = "tahoe-lafs-dummy-v1"
354        yield self.make_node(
355            introducer_furl=SOME_FURL,
356            storage_plugin=plugin_name,
357            plugin_config=plugin_config,
358        )
359        server_id = b"v0-abcdef"
360        ann = {
361            u"service-name": u"storage",
362            u"storage-options": [{
363                # and this announcement is for a plugin with a matching name
364                u"name": plugin_name,
365                u"storage-server-FURL": SOME_FURL,
366            }],
367        }
368        self.publish(server_id, ann, self.introducer_client)
369        storage = self.get_storage(server_id, self.node)
370        self.assertTrue(
371            verifyObject(
372                IFoolscapStorageServer,
373                storage,
374            ),
375        )
376        expected_rref = object()
377        # Can't easily establish a real Foolscap connection so fake the result
378        # of doing so...
379        self.set_rref(server_id, self.node, expected_rref)
380        self.expectThat(
381            storage.storage_server,
382            MatchesAll(
383                IsInstance(DummyStorageClient),
384                MatchesStructure(
385                    get_rref=AfterPreprocessing(
386                        lambda get_rref: get_rref(),
387                        Is(expected_rref),
388                    ),
389                    configuration=Equals(plugin_config),
390                    announcement=Equals({
391                        u'name': plugin_name,
392                        u'storage-server-FURL': u'pb://abcde@nowhere/fake',
393                    }),
394                ),
395            ),
396        )
397
398    @inlineCallbacks
399    def test_enabled_no_configuration_plugin(self):
400        """
401        An announcement that could be matched by a plugin that is enabled with no
402        configuration is matched and the plugin's storage client is used.
403        """
404        plugin_name = "tahoe-lafs-dummy-v1"
405        yield self.make_node(
406            introducer_furl=SOME_FURL,
407            storage_plugin=plugin_name,
408            plugin_config=None,
409        )
410        server_id = b"v0-abcdef"
411        ann = {
412            u"service-name": u"storage",
413            u"storage-options": [{
414                # and this announcement is for a plugin with a matching name
415                u"name": plugin_name,
416                u"storage-server-FURL": SOME_FURL,
417            }],
418        }
419        self.publish(server_id, ann, self.introducer_client)
420        storage = self.get_storage(server_id, self.node)
421        self.addDetail("storage", text_content(str(storage)))
422        self.expectThat(
423            storage.storage_server,
424            MatchesAll(
425                IsInstance(DummyStorageClient),
426                MatchesStructure(
427                    configuration=Equals({}),
428                ),
429            ),
430        )
431
432
433class FoolscapStorageServers(unittest.TestCase):
434    """
435    Tests for implementations of ``IFoolscapStorageServer``.
436    """
437    def test_null_provider(self):
438        """
439        Instances of ``_NullStorage`` provide ``IFoolscapStorageServer``.
440        """
441        self.assertTrue(
442            verifyObject(
443                IFoolscapStorageServer,
444                _NullStorage(),
445            ),
446        )
447
448    def test_foolscap_provider(self):
449        """
450        Instances of ``_FoolscapStorage`` provide ``IFoolscapStorageServer``.
451        """
452        @implementer(IStorageServer)
453        class NotStorageServer(object):
454            pass
455        self.assertTrue(
456            verifyObject(
457                IFoolscapStorageServer,
458                _FoolscapStorage.from_announcement(
459                    b"server-id",
460                    SOME_FURL,
461                    {u"permutation-seed-base32": base32.b2a(b"permutationseed")},
462                    NotStorageServer(),
463                ),
464            ),
465        )
466
467
468class StoragePluginWebPresence(AsyncTestCase):
469    """
470    Tests for the web resources ``IFoolscapStorageServer`` plugins may expose.
471    """
472    @inlineCallbacks
473    def setUp(self):
474        super(StoragePluginWebPresence, self).setUp()
475
476        self.useFixture(UseTestPlugins())
477
478        self.port_assigner = SameProcessStreamEndpointAssigner()
479        self.port_assigner.setUp()
480        self.addCleanup(self.port_assigner.tearDown)
481        self.storage_plugin = u"tahoe-lafs-dummy-v1"
482
483        from twisted.internet import reactor
484        _, webport_endpoint = self.port_assigner.assign(reactor)
485        tubport_location, tubport_endpoint = self.port_assigner.assign(reactor)
486
487        tempdir = TempDir()
488        self.useFixture(tempdir)
489        self.basedir = FilePath(tempdir.path)
490        self.basedir.child(u"private").makedirs()
491        self.node_fixture = self.useFixture(UseNode(
492            plugin_config={
493                "web": "1",
494            },
495            node_config={
496                # We don't really need the main Tub listening but if we
497                # disable it then we also have to disable storage (because
498                # config validation policy).
499                "tub.port": tubport_endpoint,
500                "tub.location": tubport_location,
501                "web.port": str(webport_endpoint),
502            },
503            storage_plugin=self.storage_plugin,
504            basedir=self.basedir,
505            introducer_furl=SOME_FURL,
506        ))
507        self.node = yield self.node_fixture.create_node()
508        self.webish = self.node.getServiceNamed(WebishServer.name)
509        self.node.startService()
510        self.addCleanup(self.node.stopService)
511        self.port = self.webish.getPortnum()
512
513    @inlineCallbacks
514    def test_plugin_resource_path(self):
515        """
516        The plugin's resource is published at */storage-plugins/<plugin name>*.
517        """
518        url = u"http://127.0.0.1:{port}/storage-plugins/{plugin_name}".format(
519            port=self.port,
520            plugin_name=self.storage_plugin,
521        ).encode("utf-8")
522        result = yield do_http("get", url)
523        self.assertThat(loads(result), Equals({"web": "1"}))
524
525    @inlineCallbacks
526    def test_plugin_resource_persistent_across_requests(self):
527        """
528        The plugin's resource is loaded and then saved and re-used for future
529        requests.
530        """
531        url = URL(
532            scheme=u"http",
533            host=u"127.0.0.1",
534            port=self.port,
535            path=(
536                u"storage-plugins",
537                self.storage_plugin,
538                u"counter",
539            ),
540        ).to_text().encode("utf-8")
541        values = {
542            loads((yield do_http("get", url)))[u"value"],
543            loads((yield do_http("get", url)))[u"value"],
544        }
545        self.assertThat(
546            values,
547            # If the counter manages to go up then the state stuck around.
548            Equals({1, 2}),
549        )
550
551
552_aCertPEM = Tub().myCertificate.dumpPEM()
553def new_tub():
554    """
555    Make a new ``Tub`` with a hard-coded private key.
556    """
557    # Use a private key / certificate generated by Tub how it wants.  But just
558    # re-use the same one every time so we don't waste a lot of time
559    # generating them over and over in the tests.
560    return Tub(certData=_aCertPEM)
561
562
563def make_broker(tub_maker=None):
564    """
565    Create a ``StorageFarmBroker`` with the given tub maker and an empty
566    client configuration.
567    """
568    if tub_maker is None:
569        tub_maker = lambda handler_overrides: new_tub()
570    return StorageFarmBroker(True, tub_maker, EMPTY_CLIENT_CONFIG)
571
572
573@implementer(IStreamClientEndpoint)
574@attr.s
575class SpyEndpoint(object):
576    """
577    Observe and record connection attempts.
578
579    :ivar list _append: A callable that accepts two-tuples.  For each
580        attempted connection, it will be called with ``Deferred`` that was
581        returned and the ``Factory`` that was passed in.
582    """
583    _append = attr.ib()
584
585    def connect(self, factory):
586        """
587        Record the connection attempt.
588
589        :return: A ``Deferred`` that ``SpyEndpoint`` will not fire.
590        """
591        d = Deferred()
592        self._append((d, factory))
593        return d
594
595
596@implementer(IConnectionHintHandler)  # type: ignore # warner/foolscap#78
597@attr.s
598class SpyHandler(object):
599    """
600    A Foolscap connection hint handler for the "spy" hint type.  Connections
601    are handled by just observing and recording them.
602
603    :ivar list _connects: A list containing one element for each connection
604        attempted with this handler.  Each element is a two-tuple of the
605        ``Deferred`` that was returned from ``connect`` and the factory that
606        was passed to ``connect``.
607    """
608    _connects : list[tuple[Deferred[object], IProtocolFactory]]= attr.ib(default=attr.Factory(list))
609
610    def hint_to_endpoint(self, hint, reactor, update_status):
611        return (SpyEndpoint(self._connects.append), hint)
612
613
614class TestStorageFarmBroker(unittest.TestCase):
615
616    def test_static_servers(self):
617        broker = make_broker()
618
619        key_s = b'v0-1234-1'
620        servers_yaml = """\
621storage:
622  v0-1234-1:
623    ann:
624      anonymous-storage-FURL: {furl}
625      permutation-seed-base32: aaaaaaaaaaaaaaaaaaaaaaaa
626""".format(furl=SOME_FURL)
627        servers = yamlutil.safe_load(servers_yaml)
628        permseed = base32.a2b(b"aaaaaaaaaaaaaaaaaaaaaaaa")
629        broker.set_static_servers(servers["storage"])
630        self.failUnlessEqual(len(broker._static_server_ids), 1)
631        s = broker.servers[key_s]
632        self.failUnlessEqual(s.announcement,
633                             servers["storage"]["v0-1234-1"]["ann"])
634        self.failUnlessEqual(s.get_serverid(), key_s)
635        self.assertEqual(s.get_permutation_seed(), permseed)
636
637        # if the Introducer announces the same thing, we're supposed to
638        # ignore it
639
640        ann2 = {
641            "service-name": "storage",
642            "anonymous-storage-FURL": "pb://{}@nowhere/fake2".format(str(base32.b2a(b"1"), "utf-8")),
643            "permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb",
644        }
645        broker._got_announcement(key_s, ann2)
646        s2 = broker.servers[key_s]
647        self.assertIdentical(s2, s)
648        self.assertEqual(s2.get_permutation_seed(), permseed)
649
650    def test_upgrade_from_foolscap_to_http(self):
651        """
652        When an announcement is initially Foolscap but then switches to HTTP,
653        HTTP is used, assuming HTTP is enabled.
654        """
655        tub_maker = lambda _: new_tub()
656        config = config_from_string(
657            "/dev/null", "", "[client]\nforce_foolscap = False\n"
658        )
659        broker = StorageFarmBroker(True, tub_maker, config)
660        broker.startService()
661        self.addCleanup(broker.stopService)
662        key_s = b'v0-1234-1'
663
664        ones = str(base32.b2a(b"1"), "utf-8")
665        initial_announcement = {
666            "service-name": "storage",
667            "anonymous-storage-FURL": f"pb://{ones}@nowhere/fake2",
668            "permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb",
669        }
670        broker._got_announcement(key_s, initial_announcement)
671        initial_service = broker.servers[key_s]
672        self.assertIsInstance(initial_service, NativeStorageServer)
673        self.assertTrue(initial_service.running)
674        self.assertIdentical(initial_service.parent, broker)
675
676        http_announcement = initial_announcement.copy()
677        http_announcement[ANONYMOUS_STORAGE_NURLS] = {f"pb://{ones}@nowhere/fake2#v=1"}
678        broker._got_announcement(key_s, http_announcement)
679        self.assertFalse(initial_service.running)
680        self.assertEqual(initial_service.parent, None)
681        new_service = broker.servers[key_s]
682        self.assertIsInstance(new_service, HTTPNativeStorageServer)
683        self.assertTrue(new_service.running)
684        self.assertIdentical(new_service.parent, broker)
685
686
687    def test_static_permutation_seed_pubkey(self):
688        broker = make_broker()
689        server_id = b"v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
690        k = b"4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
691        ann = {
692            "anonymous-storage-FURL": SOME_FURL,
693        }
694        broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
695        s = broker.servers[server_id]
696        self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
697
698    def test_static_permutation_seed_explicit(self):
699        broker = make_broker()
700        server_id = b"v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
701        k = b"w5gl5igiexhwmftwzhai5jy2jixn7yx7"
702        ann = {
703            "anonymous-storage-FURL": SOME_FURL,
704            "permutation-seed-base32": k,
705        }
706        broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
707        s = broker.servers[server_id]
708        self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
709
710    def test_static_permutation_seed_hashed(self):
711        broker = make_broker()
712        server_id = b"unparseable"
713        ann = {
714            "anonymous-storage-FURL": SOME_FURL,
715        }
716        broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
717        s = broker.servers[server_id]
718        self.assertEqual(s.get_permutation_seed(),
719                         hashlib.sha256(server_id).digest())
720
721    @inlineCallbacks
722    def test_threshold_reached(self):
723        """
724        ``StorageFarmBroker.when_connected_enough`` returns a ``Deferred`` which
725        only fires after the ``StorageFarmBroker`` has established at least as
726        many connections as requested.
727        """
728        introducer = MemoryIntroducerClient(
729            new_tub(),
730            SOME_FURL,
731            b"",
732            None,
733            None,
734            None,
735            None,
736        )
737        new_tubs = []
738        def make_tub(*args, **kwargs):
739            return new_tubs.pop()
740        broker = make_broker(make_tub)
741        # Start the broker so that it will start Tubs attached to it so they
742        # will attempt to make connections as necessary so that we can observe
743        # those connections.
744        broker.startService()
745        self.addCleanup(broker.stopService)
746        done = broker.when_connected_enough(5)
747        broker.use_introducer(introducer)
748        # subscribes to "storage" to learn of new storage nodes
749        [subscribe] = introducer.subscribed_to
750        self.assertEqual(
751            subscribe.service_name,
752            "storage",
753        )
754        got_announcement = subscribe.cb
755
756        data = {
757            "service-name": "storage",
758            "anonymous-storage-FURL": None,
759            "permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa",
760        }
761
762        def add_one_server(x):
763            data["anonymous-storage-FURL"] = "pb://%s@spy:nowhere/fake" % (str(base32.b2a(b"%d" % x), "ascii"),)
764            tub = new_tub()
765            connects = []
766            spy = SpyHandler(connects)
767            tub.addConnectionHintHandler("spy", spy)
768            new_tubs.append(tub)
769            got_announcement(b'v0-1234-%d' % x, data)
770
771            self.assertEqual(
772                1, len(connects),
773                "Expected one connection attempt, got {!r} instead".format(connects),
774            )
775
776            # Skip over all the Foolscap negotiation.  It's complex with lots
777            # of pieces and I don't want to figure out how to fake
778            # it. -exarkun
779            native = broker.servers[b"v0-1234-%d" % (x,)]
780            rref = LocalWrapper(StorageServer(self.mktemp(), b"x" * 20))
781            native._got_connection(rref)
782
783        # first 4 shouldn't trigger connected_threashold
784        for x in range(4):
785            add_one_server(x)
786            self.assertFalse(done.called)
787
788        # ...but the 5th *should* trigger the threshold
789        add_one_server(42)
790
791        # so: the OneShotObserverList only notifies via
792        # foolscap.eventually() -- which forces the Deferred call
793        # through the reactor -- so it's no longer synchronous,
794        # meaning that we have to do "real reactor stuff" for the
795        # Deferred from when_connected_enough() to actually fire. (or
796        # @patch() out the reactor in foolscap.eventually to be a
797        # Clock() so we can advance time ourselves, but ... luckily
798        # eventually() uses 0 as the timeout currently)
799
800        yield done
801        self.assertTrue(done.called)
802
803    def test_should_we_use_http_default(self):
804        """Default is to use HTTP."""
805        basedir = self.mktemp()
806        node_config = config_from_string(basedir, "", "")
807        announcement = {ANONYMOUS_STORAGE_NURLS: ["pb://..."]}
808        self.assertTrue(
809            StorageFarmBroker._should_we_use_http(node_config, announcement)
810        )
811        # Lacking NURLs, we can't use HTTP:
812        self.assertFalse(
813            StorageFarmBroker._should_we_use_http(node_config, {})
814        )
815
816    def test_should_we_use_http(self):
817        """
818        If HTTP is allowed, it will only be used if the announcement includes
819        some NURLs.
820        """
821        basedir = self.mktemp()
822
823        no_nurls = {}
824        empty_nurls = {ANONYMOUS_STORAGE_NURLS: []}
825        has_nurls = {ANONYMOUS_STORAGE_NURLS: ["pb://.."]}
826
827        for force_foolscap, announcement, expected_http_usage in [
828                ("false", no_nurls, False),
829                ("false", empty_nurls, False),
830                ("false", has_nurls, True),
831                ("true", empty_nurls, False),
832                ("true", no_nurls, False),
833                ("true", has_nurls, False),
834        ]:
835            node_config = config_from_string(
836                basedir, "", f"[client]\nforce_foolscap = {force_foolscap}"
837            )
838            self.assertEqual(
839                StorageFarmBroker._should_we_use_http(node_config, announcement),
840                expected_http_usage
841            )
842
843
844class PickHTTPServerTests(unittest.SynchronousTestCase):
845    """Tests for ``_pick_a_http_server``."""
846
847    def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]:
848        """
849        Given mapping of URLs to (delay, result), return the URL of the
850        first selected server, or None.
851        """
852        clock = Clock()
853
854        def request(reactor, url):
855            delay, value = url_to_results[url]
856            result = Deferred()
857            def add_result_value():
858                if isinstance(value, Exception):
859                    result.errback(value)
860                else:
861                    result.callback(value)
862            reactor.callLater(delay, add_result_value)
863            return result
864
865        d = _pick_a_http_server(clock, list(url_to_results.keys()), request)
866        for i in range(100):
867            clock.advance(0.1)
868        return d
869
870    def test_first_successful_connect_is_picked(self):
871        """
872        Given multiple good URLs, the first one that connects is chosen.
873        """
874        earliest_url = DecodedURL.from_text("http://a")
875        latest_url = DecodedURL.from_text("http://b")
876        bad_url = DecodedURL.from_text("http://bad")
877        result = self.pick_result({
878            latest_url: (2, None),
879            earliest_url: (1, None),
880            bad_url: (0.5, RuntimeError()),
881        })
882        self.assertEqual(self.successResultOf(result), earliest_url)
883
884    def test_failures_include_all_reasons(self):
885        """
886        If all the requests fail, ``_pick_a_http_server`` raises a
887        ``allmydata.util.deferredutil.MultiFailure``.
888        """
889        eventually_good_url = DecodedURL.from_text("http://good")
890        bad_url = DecodedURL.from_text("http://bad")
891        exception1 = RuntimeError()
892        exception2 = ZeroDivisionError()
893        result = self.pick_result({
894            eventually_good_url: (1, exception1),
895            bad_url: (0.1, exception2),
896        })
897        exc = self.failureResultOf(result).value
898        self.assertIsInstance(exc, MultiFailure)
899        self.assertEqual({f.value for f in exc.failures}, {exception2, exception1})
Note: See TracBrowser for help on using the repository browser.