source: trunk/src/allmydata/client.py

Last change on this file was 00f82a4, checked in by Christopher R. Wood <chris@…>, at 2024-05-30T19:51:24Z

Expand type hints for create_dirnode

  • Property mode set to 100644
File size: 44.5 KB
Line 
1"""
2Functionality related to operating a Tahoe-LAFS node (client _or_ server).
3"""
4from __future__ import annotations
5
6import os
7import stat
8import time
9import weakref
10from typing import Optional, Iterable
11from base64 import urlsafe_b64encode
12from functools import partial
13from configparser import NoSectionError
14
15from six import ensure_text
16from foolscap.furl import (
17    decode_furl,
18)
19
20import attr
21from zope.interface import implementer
22
23from twisted.plugin import (
24    getPlugins,
25)
26from twisted.internet import reactor, defer
27from twisted.application import service
28from twisted.application.internet import TimerService
29from twisted.python.filepath import FilePath
30
31import allmydata
32from allmydata import node
33from allmydata.crypto import rsa, ed25519
34from allmydata.crypto.util import remove_prefix
35from allmydata.dirnode import DirectoryNode
36from allmydata.storage.server import StorageServer, FoolscapStorageServer
37from allmydata import storage_client
38from allmydata.immutable.upload import Uploader
39from allmydata.immutable.offloaded import Helper
40from allmydata.mutable.filenode import MutableFileNode
41from allmydata.introducer.client import IntroducerClient
42from allmydata.util import (
43    hashutil, base32, pollmixin, log, idlib,
44    yamlutil, configutil,
45    fileutil,
46)
47from allmydata.util.encodingutil import get_filesystem_encoding
48from allmydata.util.abbreviate import parse_abbreviated_size
49from allmydata.util.time_format import parse_duration, parse_date
50from allmydata.util.i2p_provider import create as create_i2p_provider
51from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider
52from allmydata.util.cputhreadpool import defer_to_thread
53from allmydata.util.deferredutil import async_to_deferred
54from allmydata.stats import StatsProvider
55from allmydata.history import History
56from allmydata.interfaces import (
57    IStatsProducer,
58    SDMF_VERSION,
59    MDMF_VERSION,
60    DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
61    IFoolscapStoragePlugin,
62    IAnnounceableStorageServer,
63)
64from allmydata.nodemaker import NodeMaker
65from allmydata.blacklist import Blacklist
66from allmydata.node import _Config
67
68KiB=1024
69MiB=1024*KiB
70GiB=1024*MiB
71TiB=1024*GiB
72PiB=1024*TiB
73
74def _is_valid_section(section_name):
75    """
76    Check for valid dynamic configuration section names.
77
78    Currently considers all possible storage server plugin sections valid.
79    """
80    return (
81        section_name.startswith("storageserver.plugins.") or
82        section_name.startswith("storageclient.plugins.") or
83        section_name in ("grid_managers", "grid_manager_certificates")
84    )
85
86
87_client_config = configutil.ValidConfiguration(
88    static_valid_sections={
89        "client": (
90            "helper.furl",
91            "introducer.furl",
92            "key_generator.furl",
93            "mutable.format",
94            "peers.preferred",
95            "shares.happy",
96            "shares.needed",
97            "shares.total",
98            "shares._max_immutable_segment_size_for_testing",
99            "storage.plugins",
100            "force_foolscap",
101        ),
102        "storage": (
103            "debug_discard",
104            "enabled",
105            "anonymous",
106            "expire.cutoff_date",
107            "expire.enabled",
108            "expire.immutable",
109            "expire.mode",
110            "expire.mode",
111            "expire.mutable",
112            "expire.override_lease_duration",
113            "readonly",
114            "reserved_space",
115            "storage_dir",
116            "plugins",
117            "grid_management",
118            "force_foolscap",
119        ),
120        "sftpd": (
121            "accounts.file",
122            "enabled",
123            "host_privkey_file",
124            "host_pubkey_file",
125            "port",
126        ),
127        "helper": (
128            "enabled",
129        ),
130    },
131    is_valid_section=_is_valid_section,
132    # Anything in a valid section is a valid item, for now.
133    is_valid_item=lambda section, ignored: _is_valid_section(section),
134)
135
136
137def _valid_config():
138    cfg = node._common_valid_config()
139    return cfg.update(_client_config)
140
141# this is put into README in new node-directories
142CLIENT_README = u"""
143This directory contains files which contain private data for the Tahoe node,
144such as private keys.  On Unix-like systems, the permissions on this directory
145are set to disallow users other than its owner from reading the contents of
146the files.   See the 'configuration.rst' documentation file for details.
147"""
148
149
150
151def _make_secret():
152    """
153    Returns a base32-encoded random secret of hashutil.CRYPTO_VAL_SIZE
154    bytes.
155    """
156    return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + b"\n"
157
158
159class SecretHolder(object):
160    def __init__(self, lease_secret, convergence_secret):
161        self._lease_secret = lease_secret
162        self._convergence_secret = convergence_secret
163
164    def get_renewal_secret(self):
165        return hashutil.my_renewal_secret_hash(self._lease_secret)
166
167    def get_cancel_secret(self):
168        return hashutil.my_cancel_secret_hash(self._lease_secret)
169
170    def get_convergence_secret(self):
171        return self._convergence_secret
172
173class KeyGenerator(object):
174    """I create RSA keys for mutable files. Each call to generate() returns a
175    single keypair."""
176
177    @async_to_deferred
178    async def generate(self) -> tuple[rsa.PublicKey, rsa.PrivateKey]:
179        """
180        I return a Deferred that fires with a (verifyingkey, signingkey)
181        pair. The returned key will be 2048 bit.
182        """
183        keysize = 2048
184        private, public = await defer_to_thread(
185            rsa.create_signing_keypair, keysize
186        )
187        return public, private
188
189
190class Terminator(service.Service):
191    def __init__(self):
192        self._clients = weakref.WeakKeyDictionary()
193    def register(self, c):
194        self._clients[c] = None
195    def stopService(self):
196        for c in self._clients:
197            c.stop()
198        return service.Service.stopService(self)
199
200
201def read_config(basedir, portnumfile, generated_files: Iterable=()):
202    """
203    Read and validate configuration for a client-style Node. See
204    :method:`allmydata.node.read_config` for parameter meanings (the
205    only difference here is we pass different validation data)
206
207    :returns: :class:`allmydata.node._Config` instance
208    """
209    return node.read_config(
210        basedir, portnumfile,
211        generated_files=generated_files,
212        _valid_config=_valid_config(),
213    )
214
215
216config_from_string = partial(
217    node.config_from_string,
218    _valid_config=_valid_config(),
219)
220
221
222def create_client(basedir=u".", _client_factory=None):
223    """
224    Creates a new client instance (a subclass of Node).
225
226    :param unicode basedir: the node directory (which may not exist yet)
227
228    :param _client_factory: (for testing) a callable that returns an
229        instance of :class:`allmydata.node.Node` (or a subclass). By default
230        this is :class:`allmydata.client._Client`
231
232    :returns: Deferred yielding an instance of :class:`allmydata.client._Client`
233    """
234    try:
235        node.create_node_dir(basedir, CLIENT_README)
236        config = read_config(basedir, u"client.port")
237        # following call is async
238        return create_client_from_config(
239            config,
240            _client_factory=_client_factory,
241        )
242    except Exception:
243        return defer.fail()
244
245
246@defer.inlineCallbacks
247def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
248    """
249    Creates a new client instance (a subclass of Node).  Most code
250    should probably use `create_client` instead.
251
252    :returns: Deferred yielding a _Client instance
253
254    :param config: configuration instance (from read_config()) which
255        encapsulates everything in the "node directory".
256
257    :param _client_factory: for testing; the class to instantiate
258        instead of _Client
259
260    :param _introducer_factory: for testing; the class to instantiate instead
261        of IntroducerClient
262    """
263    if _client_factory is None:
264        _client_factory = _Client
265
266    i2p_provider = create_i2p_provider(reactor, config)
267    tor_provider = create_tor_provider(reactor, config)
268    handlers = node.create_connection_handlers(config, i2p_provider, tor_provider)
269    default_connection_handlers, foolscap_connection_handlers = handlers
270    tub_options = node.create_tub_options(config)
271
272    main_tub = node.create_main_tub(
273        config, tub_options, default_connection_handlers,
274        foolscap_connection_handlers, i2p_provider, tor_provider,
275    )
276
277    introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
278    storage_broker = create_storage_farm_broker(
279        config, default_connection_handlers, foolscap_connection_handlers,
280        tub_options, introducer_clients, tor_provider
281    )
282
283    client = _client_factory(
284        config,
285        main_tub,
286        i2p_provider,
287        tor_provider,
288        introducer_clients,
289        storage_broker,
290    )
291
292    # Initialize storage separately after creating the client.  This is
293    # necessary because we need to pass a reference to the client in to the
294    # storage plugins to allow them to initialize themselves (specifically,
295    # they may want the anonymous IStorageServer implementation so they don't
296    # have to duplicate all of its basic storage functionality).  A better way
297    # to do this, eventually, may be to create that implementation first and
298    # then pass it in to both storage plugin creation and the client factory.
299    # This avoids making a partially initialized client object escape the
300    # client factory and removes the circular dependency between these
301    # objects.
302    storage_plugins = yield _StoragePlugins.from_config(
303        client.get_anonymous_storage_server,
304        config,
305    )
306    client.init_storage(storage_plugins.announceable_storage_servers)
307
308    i2p_provider.setServiceParent(client)
309    tor_provider.setServiceParent(client)
310    for ic in introducer_clients:
311        ic.setServiceParent(client)
312    storage_broker.setServiceParent(client)
313    defer.returnValue(client)
314
315
316@attr.s
317class _StoragePlugins(object):
318    """
319    Functionality related to getting storage plugins set up and ready for use.
320
321    :ivar list[IAnnounceableStorageServer] announceable_storage_servers: The
322        announceable storage servers that should be used according to node
323        configuration.
324    """
325    announceable_storage_servers = attr.ib()
326
327    @classmethod
328    @defer.inlineCallbacks
329    def from_config(cls, get_anonymous_storage_server, config):
330        """
331        Load and configured storage plugins.
332
333        :param get_anonymous_storage_server: A no-argument callable which
334            returns the node's anonymous ``IStorageServer`` implementation.
335
336        :param _Config config: The node's configuration.
337
338        :return: A ``_StoragePlugins`` initialized from the given
339            configuration.
340        """
341        storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
342        plugins = list(cls._collect_storage_plugins(storage_plugin_names))
343        unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins}
344        if unknown_plugin_names:
345            raise configutil.UnknownConfigError(
346                "Storage plugins {} are enabled but not known on this system.".format(
347                    unknown_plugin_names,
348                ),
349            )
350        announceable_storage_servers = yield cls._create_plugin_storage_servers(
351            get_anonymous_storage_server,
352            config,
353            plugins,
354        )
355        defer.returnValue(cls(
356            announceable_storage_servers,
357        ))
358
359    @classmethod
360    def _get_enabled_storage_plugin_names(cls, config):
361        """
362        Get the names of storage plugins that are enabled in the configuration.
363        """
364        return set(
365            config.get_config(
366                "storage", "plugins", ""
367            ).split(u",")
368        ) - {u""}
369
370    @classmethod
371    def _collect_storage_plugins(cls, storage_plugin_names):
372        """
373        Get the storage plugins with names matching those given.
374        """
375        return list(
376            plugin
377            for plugin
378            in getPlugins(IFoolscapStoragePlugin)
379            if plugin.name in storage_plugin_names
380        )
381
382    @classmethod
383    def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
384        """
385        Cause each storage plugin to instantiate its storage server and return
386        them all.
387
388        :return: A ``Deferred`` that fires with storage servers instantiated
389            by all of the given storage server plugins.
390        """
391        return defer.gatherResults(
392            list(
393                plugin.get_storage_server(
394                    cls._get_storage_plugin_configuration(config, plugin.name),
395                    get_anonymous_storage_server,
396                ).addCallback(
397                    partial(
398                        _add_to_announcement,
399                        {u"name": plugin.name},
400                    ),
401                )
402                for plugin
403                # The order is fairly arbitrary and it is not meant to convey
404                # anything but providing *some* stable ordering makes the data
405                # a little easier to deal with (mainly in tests and when
406                # manually inspecting it).
407                in sorted(plugins, key=lambda p: p.name)
408            ),
409        )
410
411    @classmethod
412    def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
413        """
414        Load the configuration for a storage server plugin with the given name.
415
416        :return dict[bytes, bytes]: The matching configuration.
417        """
418        try:
419            config = config.items(
420                "storageserver.plugins." + storage_plugin_name,
421            )
422        except NoSectionError:
423            config = []
424        return dict(config)
425
426
427
428def _sequencer(config):
429    """
430    :returns: a 2-tuple consisting of a new announcement
431        sequence-number and random nonce (int, unicode). Reads and
432        re-writes configuration file "announcement-seqnum" (starting at 1
433        if that file doesn't exist).
434    """
435    seqnum_s = config.get_config_from_file("announcement-seqnum")
436    if not seqnum_s:
437        seqnum_s = u"0"
438    seqnum = int(seqnum_s.strip())
439    seqnum += 1  # increment
440    config.write_config_file("announcement-seqnum", "{}\n".format(seqnum))
441    nonce = _make_secret().strip()
442    return seqnum, nonce
443
444
445def create_introducer_clients(config, main_tub, _introducer_factory=None):
446    """
447    Read, validate and parse any 'introducers.yaml' configuration.
448
449    :param _introducer_factory: for testing; the class to instantiate instead
450        of IntroducerClient
451
452    :returns: a list of IntroducerClient instances
453    """
454    if _introducer_factory is None:
455        _introducer_factory = IntroducerClient
456
457    # we return this list
458    introducer_clients = []
459
460    introducers = config.get_introducer_configuration()
461
462    for petname, (furl, cache_path) in list(introducers.items()):
463        ic = _introducer_factory(
464            main_tub,
465            furl.encode("ascii"),
466            config.nickname,
467            str(allmydata.__full_version__),
468            str(_Client.OLDEST_SUPPORTED_VERSION),
469            partial(_sequencer, config),
470            cache_path,
471        )
472        introducer_clients.append(ic)
473    return introducer_clients
474
475
476def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]):
477    """
478    Create a StorageFarmBroker object, for use by Uploader/Downloader
479    (and everybody else who wants to use storage servers)
480
481    :param config: a _Config instance
482
483    :param default_connection_handlers: default Foolscap handlers
484
485    :param foolscap_connection_handlers: available/configured Foolscap
486        handlers
487
488    :param dict tub_options: how to configure our Tub
489
490    :param list introducer_clients: IntroducerClient instances if
491        we're connecting to any
492    """
493    storage_client_config = storage_client.StorageClientConfig.from_node_config(
494        config,
495    )
496    # ensure that we can at least load all plugins that the
497    # configuration mentions; doing this early (i.e. before creating
498    # storage-clients themselves) allows us to exit in case of a
499    # problem.
500    storage_client_config.get_configured_storage_plugins()
501
502    def tub_creator(handler_overrides=None, **kwargs):
503        return node.create_tub(
504            tub_options,
505            default_connection_handlers,
506            foolscap_connection_handlers,
507            handler_overrides={} if handler_overrides is None else handler_overrides,
508            **kwargs
509        )
510
511    # create the actual storage-broker
512    sb = storage_client.StorageFarmBroker(
513        permute_peers=True,
514        tub_maker=tub_creator,
515        node_config=config,
516        storage_client_config=storage_client_config,
517        default_connection_handlers=default_connection_handlers,
518        tor_provider=tor_provider,
519    )
520    for ic in introducer_clients:
521        sb.use_introducer(ic)
522    return sb
523
524
525def _register_reference(key, config, tub, referenceable):
526    """
527    Register a referenceable in a tub with a stable fURL.
528
529    Stability is achieved by storing the fURL in the configuration the first
530    time and then reading it back on for future calls.
531
532    :param bytes key: An identifier for this reference which can be used to
533        identify its fURL in the configuration.
534
535    :param _Config config: The configuration to use for fURL persistence.
536
537    :param Tub tub: The tub in which to register the reference.
538
539    :param Referenceable referenceable: The referenceable to register in the
540        Tub.
541
542    :return bytes: The fURL at which the object is registered.
543    """
544    persisted_furl = config.get_private_config(
545        key,
546        default=None,
547    )
548    name = None
549    if persisted_furl is not None:
550        _, _, name = decode_furl(persisted_furl)
551    registered_furl = tub.registerReference(
552        referenceable,
553        name=name,
554    )
555    if persisted_furl is None:
556        config.write_private_config(key, registered_furl)
557    return registered_furl
558
559
560@implementer(IAnnounceableStorageServer)
561@attr.s
562class AnnounceableStorageServer(object):
563    announcement = attr.ib()
564    storage_server = attr.ib()
565
566
567
568def _add_to_announcement(information, announceable_storage_server):
569    """
570    Create a new ``AnnounceableStorageServer`` based on
571    ``announceable_storage_server`` with ``information`` added to its
572    ``announcement``.
573    """
574    updated_announcement = announceable_storage_server.announcement.copy()
575    updated_announcement.update(information)
576    return AnnounceableStorageServer(
577        updated_announcement,
578        announceable_storage_server.storage_server,
579    )
580
581
582def storage_enabled(config):
583    """
584    Is storage enabled according to the given configuration object?
585
586    :param _Config config: The configuration to inspect.
587
588    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
589    """
590    return config.get_config("storage", "enabled", True, boolean=True)
591
592
593def anonymous_storage_enabled(config):
594    """
595    Is anonymous access to storage enabled according to the given
596    configuration object?
597
598    :param _Config config: The configuration to inspect.
599
600    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
601    """
602    return (
603        storage_enabled(config) and
604        config.get_config("storage", "anonymous", True, boolean=True)
605    )
606
607
608@implementer(IStatsProducer)
609class _Client(node.Node, pollmixin.PollMixin):
610    """
611    This class should be refactored; see
612    https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931
613    """
614
615    STOREDIR = 'storage'
616    NODETYPE = "client"
617    EXIT_TRIGGER_FILE = "exit_trigger"
618
619    # This means that if a storage server treats me as though I were a
620    # 1.0.0 storage client, it will work as they expect.
621    OLDEST_SUPPORTED_VERSION = "1.0.0"
622
623    # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
624    # is the number of shares required to reconstruct a file. 'desired' means
625    # that we will abort an upload unless we can allocate space for at least
626    # this many. 'total' is the total number of shares created by encoding.
627    # If everybody has room then this is is how many we will upload.
628    DEFAULT_ENCODING_PARAMETERS = {"k": 3,
629                                   "happy": 7,
630                                   "n": 10,
631                                   "max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
632                                   }
633
634    def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
635                 storage_farm_broker):
636        """
637        Use :func:`allmydata.client.create_client` to instantiate one of these.
638        """
639        node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
640
641        self.started_timestamp = time.time()
642        self.logSource = "Client"
643        self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
644
645        self.introducer_clients = introducer_clients
646        self.storage_broker = storage_farm_broker
647
648        self.init_stats_provider()
649        self.init_secrets()
650        self.init_node_key()
651        self._key_generator = KeyGenerator()
652        key_gen_furl = config.get_config("client", "key_generator.furl", None)
653        if key_gen_furl:
654            log.msg("[client]key_generator.furl= is now ignored, see #2783")
655        self.init_client()
656        self.load_static_servers()
657        self.helper = None
658        if config.get_config("helper", "enabled", False, boolean=True):
659            if not self._is_tub_listening():
660                raise ValueError("config error: helper is enabled, but tub "
661                                 "is not listening ('tub.port=' is empty)")
662            self.init_helper()
663        self.init_sftp_server()
664
665        # If the node sees an exit_trigger file, it will poll every second to see
666        # whether the file still exists, and what its mtime is. If the file does not
667        # exist or has not been modified for a given timeout, the node will exit.
668        exit_trigger_file = config.get_config_path(self.EXIT_TRIGGER_FILE)
669        if os.path.exists(exit_trigger_file):
670            age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
671            self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
672            exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
673            exit_trigger.setServiceParent(self)
674
675        # this needs to happen last, so it can use getServiceNamed() to
676        # acquire references to StorageServer and other web-statusable things
677        webport = config.get_config("node", "web.port", None)
678        if webport:
679            self.init_web(webport) # strports string
680
681        # TODO this may be the wrong location for now? but as temporary measure
682        # it allows us to get NURLs for testing in test_istorageserver.py. This
683        # will eventually get fixed one way or another in
684        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3901. See also
685        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 for the bigger
686        # picture issue.
687        self.storage_nurls : Optional[set] = None
688
689    def init_stats_provider(self):
690        self.stats_provider = StatsProvider(self)
691        self.stats_provider.setServiceParent(self)
692        self.stats_provider.register_producer(self)
693
694    def get_stats(self):
695        return { 'node.uptime': time.time() - self.started_timestamp }
696
697    def init_secrets(self):
698        # configs are always unicode
699        def _unicode_make_secret():
700            return str(_make_secret(), "ascii")
701        lease_s = self.config.get_or_create_private_config(
702            "secret", _unicode_make_secret).encode("utf-8")
703        lease_secret = base32.a2b(lease_s)
704        convergence_s = self.config.get_or_create_private_config(
705            'convergence', _unicode_make_secret).encode("utf-8")
706        self.convergence = base32.a2b(convergence_s)
707        self._secret_holder = SecretHolder(lease_secret, self.convergence)
708
709    def init_node_key(self):
710        # we only create the key once. On all subsequent runs, we re-use the
711        # existing key
712        def _make_key():
713            private_key, _ = ed25519.create_signing_keypair()
714            # Config values are always unicode:
715            return str(ed25519.string_from_signing_key(private_key) + b"\n", "utf-8")
716
717        private_key_str = self.config.get_or_create_private_config(
718            "node.privkey", _make_key).encode("utf-8")
719        private_key, public_key = ed25519.signing_keypair_from_string(private_key_str)
720        public_key_str = ed25519.string_from_verifying_key(public_key)
721        self.config.write_config_file("node.pubkey", public_key_str + b"\n", "wb")
722        self._node_private_key = private_key
723        self._node_public_key = public_key
724
725    def get_long_nodeid(self):
726        # this matches what IServer.get_longname() says about us elsewhere
727        vk_string = ed25519.string_from_verifying_key(self._node_public_key)
728        return remove_prefix(vk_string, b"pub-")
729
730    def get_long_tubid(self):
731        return idlib.nodeid_b2a(self.nodeid)
732
733    def get_web_service(self):
734        """
735        :return: a reference to our web server
736        """
737        return self.getServiceNamed("webish")
738
739    def _init_permutation_seed(self, ss):
740        seed = self.config.get_config_from_file("permutation-seed")
741        if not seed:
742            have_shares = ss.have_shares()
743            if have_shares:
744                # if the server has shares but not a recorded
745                # permutation-seed, then it has been around since pre-#466
746                # days, and the clients who uploaded those shares used our
747                # TubID as a permutation-seed. We should keep using that same
748                # seed to keep the shares in the same place in the permuted
749                # ring, so those clients don't have to perform excessive
750                # searches.
751                seed = base32.b2a(self.nodeid)
752            else:
753                # otherwise, we're free to use the more natural seed of our
754                # pubkey-based serverid
755                vk_string = ed25519.string_from_verifying_key(self._node_public_key)
756                vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX)
757                seed = base32.b2a(vk_bytes)
758            self.config.write_config_file("permutation-seed", seed+b"\n", mode="wb")
759        return seed.strip()
760
761    def get_anonymous_storage_server(self):
762        """
763        Get the anonymous ``IStorageServer`` implementation for this node.
764
765        Note this will return an object even if storage is disabled on this
766        node (but the object will not be exposed, peers will not be able to
767        access it, and storage will remain disabled).
768
769        The one and only instance for this node is always returned.  It is
770        created first if necessary.
771        """
772        try:
773            ss = self.getServiceNamed(StorageServer.name)
774        except KeyError:
775            pass
776        else:
777            return ss
778
779        readonly = self.config.get_config("storage", "readonly", False, boolean=True)
780
781        config_storedir = self.get_config(
782            "storage", "storage_dir", self.STOREDIR,
783        )
784        storedir = self.config.get_config_path(config_storedir)
785
786        data = self.config.get_config("storage", "reserved_space", None)
787        try:
788            reserved = parse_abbreviated_size(data)
789        except ValueError:
790            log.msg("[storage]reserved_space= contains unparseable value %s"
791                    % data)
792            raise
793        if reserved is None:
794            reserved = 0
795        discard = self.config.get_config("storage", "debug_discard", False,
796                                         boolean=True)
797
798        expire = self.config.get_config("storage", "expire.enabled", False, boolean=True)
799        if expire:
800            mode = self.config.get_config("storage", "expire.mode") # require a mode
801        else:
802            mode = self.config.get_config("storage", "expire.mode", "age")
803
804        o_l_d = self.config.get_config("storage", "expire.override_lease_duration", None)
805        if o_l_d is not None:
806            o_l_d = parse_duration(o_l_d)
807
808        cutoff_date = None
809        if mode == "cutoff-date":
810            cutoff_date = self.config.get_config("storage", "expire.cutoff_date")
811            cutoff_date = parse_date(cutoff_date)
812
813        sharetypes = []
814        if self.config.get_config("storage", "expire.immutable", True, boolean=True):
815            sharetypes.append("immutable")
816        if self.config.get_config("storage", "expire.mutable", True, boolean=True):
817            sharetypes.append("mutable")
818        expiration_sharetypes = tuple(sharetypes)
819
820        ss = StorageServer(
821            storedir, self.nodeid,
822            reserved_space=reserved,
823            discard_storage=discard,
824            readonly_storage=readonly,
825            stats_provider=self.stats_provider,
826            expiration_enabled=expire,
827            expiration_mode=mode,
828            expiration_override_lease_duration=o_l_d,
829            expiration_cutoff_date=cutoff_date,
830            expiration_sharetypes=expiration_sharetypes,
831        )
832        ss.setServiceParent(self)
833        return ss
834
835    def init_storage(self, announceable_storage_servers):
836        # should we run a storage server (and publish it for others to use)?
837        if not storage_enabled(self.config):
838            return
839        if not self._is_tub_listening():
840            raise ValueError("config error: storage is enabled, but tub "
841                             "is not listening ('tub.port=' is empty)")
842
843        ss = self.get_anonymous_storage_server()
844        announcement = {
845            "permutation-seed-base32": self._init_permutation_seed(ss),
846        }
847
848        if anonymous_storage_enabled(self.config):
849            furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
850            furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
851            (_, _, swissnum) = decode_furl(furl)
852            if hasattr(self.tub.negotiationClass, "add_storage_server"):
853                nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
854                self.storage_nurls = nurls
855                # There is code in e.g. storage_client.py that checks if an
856                # announcement has changed. Since NURL order isn't meaningful,
857                # we don't want a change in the order to count as a change, so we
858                # send the NURLs as a set. CBOR supports sets, as does Foolscap.
859                announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = {n.to_text() for n in nurls}
860            announcement["anonymous-storage-FURL"] = furl
861
862        enabled_storage_servers = self._enable_storage_servers(
863            announceable_storage_servers,
864        )
865        storage_options = list(
866            storage_server.announcement
867            for storage_server
868            in enabled_storage_servers
869        )
870        plugins_announcement = {}
871        if storage_options:
872            # Only add the new key if there are any plugins enabled.
873            plugins_announcement[u"storage-options"] = storage_options
874
875        announcement.update(plugins_announcement)
876
877        if self.config.get_config("storage", "grid_management", default=False, boolean=True):
878            grid_manager_certificates = self.config.get_grid_manager_certificates()
879            announcement[u"grid-manager-certificates"] = grid_manager_certificates
880
881        # Note: certificates are not verified for validity here, but
882        # that may be useful. See:
883        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977
884
885        for ic in self.introducer_clients:
886            ic.publish("storage", announcement, self._node_private_key)
887
888    def get_client_storage_plugin_web_resources(self):
889        """
890        Get all of the client-side ``IResource`` implementations provided by
891        enabled storage plugins.
892
893        :return dict[bytes, IResource provider]: The implementations.
894        """
895        return self.storage_broker.get_client_storage_plugin_web_resources(
896            self.config,
897        )
898
899    def _enable_storage_servers(self, announceable_storage_servers):
900        """
901        Register and announce the given storage servers.
902        """
903        for announceable in announceable_storage_servers:
904            yield self._enable_storage_server(announceable)
905
906    def _enable_storage_server(self, announceable_storage_server):
907        """
908        Register a storage server.
909        """
910        config_key = "storage-plugin.{}.furl".format(
911            # Oops, why don't I have a better handle on this value?
912            announceable_storage_server.announcement[u"name"],
913        )
914        furl = _register_reference(
915            config_key,
916            self.config,
917            self.tub,
918            announceable_storage_server.storage_server,
919        )
920        announceable_storage_server = _add_to_announcement(
921            {u"storage-server-FURL": furl},
922            announceable_storage_server,
923        )
924        return announceable_storage_server
925
926    def init_client(self):
927        helper_furl = self.config.get_config("client", "helper.furl", None)
928        if helper_furl in ("None", ""):
929            helper_furl = None
930
931        DEP = self.encoding_params
932        DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
933        DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
934        DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
935        # At the moment this is only used for testing, thus the janky config
936        # attribute name.
937        DEP["max_segment_size"] = int(self.config.get_config(
938            "client",
939            "shares._max_immutable_segment_size_for_testing",
940            DEP["max_segment_size"])
941        )
942
943        # for the CLI to authenticate to local JSON endpoints
944        self._create_auth_token()
945
946        self.history = History(self.stats_provider)
947        self.terminator = Terminator()
948        self.terminator.setServiceParent(self)
949        uploader = Uploader(
950            helper_furl,
951            self.stats_provider,
952            self.history,
953        )
954        uploader.setServiceParent(self)
955        self.init_blacklist()
956        self.init_nodemaker()
957
958    def get_auth_token(self):
959        """
960        This returns a local authentication token, which is just some
961        random data in "api_auth_token" which must be echoed to API
962        calls.
963        """
964        return self.config.get_private_config(
965            'api_auth_token').encode("ascii")
966
967    def _create_auth_token(self):
968        """
969        Creates new auth-token data written to 'private/api_auth_token'.
970
971        This is intentionally re-created every time the node starts.
972        """
973        self.config.write_private_config(
974            'api_auth_token',
975            urlsafe_b64encode(os.urandom(32)) + b'\n',
976        )
977
978    def get_storage_broker(self):
979        return self.storage_broker
980
981    def load_static_servers(self):
982        """
983        Load the servers.yaml file if it exists, and provide the static
984        server data to the StorageFarmBroker.
985        """
986        fn = self.config.get_private_path("servers.yaml")
987        servers_filepath = FilePath(fn)
988        try:
989            with servers_filepath.open() as f:
990                servers_yaml = yamlutil.safe_load(f)
991            static_servers = servers_yaml.get("storage", {})
992            log.msg("found %d static servers in private/servers.yaml" %
993                    len(static_servers))
994            static_servers = {
995                ensure_text(key): value for (key, value) in static_servers.items()
996            }
997            self.storage_broker.set_static_servers(static_servers)
998        except EnvironmentError:
999            pass
1000
1001    def init_blacklist(self):
1002        fn = self.config.get_config_path("access.blacklist")
1003        self.blacklist = Blacklist(fn)
1004
1005    def init_nodemaker(self):
1006        default = self.config.get_config("client", "mutable.format", default="SDMF")
1007        if default.upper() == "MDMF":
1008            self.mutable_file_default = MDMF_VERSION
1009        else:
1010            self.mutable_file_default = SDMF_VERSION
1011        self.nodemaker = NodeMaker(self.storage_broker,
1012                                   self._secret_holder,
1013                                   self.get_history(),
1014                                   self.getServiceNamed("uploader"),
1015                                   self.terminator,
1016                                   self.get_encoding_parameters(),
1017                                   self.mutable_file_default,
1018                                   self._key_generator,
1019                                   self.blacklist)
1020
1021    def get_history(self):
1022        return self.history
1023
1024    def init_helper(self):
1025        self.helper = Helper(self.config.get_config_path("helper"),
1026                             self.storage_broker, self._secret_holder,
1027                             self.stats_provider, self.history)
1028        # TODO: this is confusing. BASEDIR/private/helper.furl is created by
1029        # the helper. BASEDIR/helper.furl is consumed by the client who wants
1030        # to use the helper. I like having the filename be the same, since
1031        # that makes 'cp' work smoothly, but the difference between config
1032        # inputs and generated outputs is hard to see.
1033        helper_furlfile = self.config.get_private_path("helper.furl").encode(get_filesystem_encoding())
1034        self.tub.registerReference(self.helper, furlFile=helper_furlfile)
1035
1036    def _get_tempdir(self):
1037        """
1038        Determine the path to the directory where temporary files for this node
1039        should be written.
1040
1041        :return bytes: The path which will exist and be a directory.
1042        """
1043        tempdir_config = self.config.get_config("node", "tempdir", "tmp")
1044        if isinstance(tempdir_config, bytes):
1045            tempdir_config = tempdir_config.decode('utf-8')
1046        tempdir = self.config.get_config_path(tempdir_config)
1047        if not os.path.exists(tempdir):
1048            fileutil.make_dirs(tempdir)
1049        return tempdir
1050
1051    def init_web(self, webport):
1052        self.log("init_web(webport=%s)", args=(webport,))
1053
1054        from allmydata.webish import WebishServer, anonymous_tempfile_factory
1055        nodeurl_path = self.config.get_config_path("node.url")
1056        staticdir_config = self.config.get_config("node", "web.static", "public_html")
1057        staticdir = self.config.get_config_path(staticdir_config)
1058        ws = WebishServer(
1059            self,
1060            webport,
1061            anonymous_tempfile_factory(self._get_tempdir()),
1062            nodeurl_path,
1063            staticdir,
1064        )
1065        ws.setServiceParent(self)
1066
1067    def init_sftp_server(self):
1068        if self.config.get_config("sftpd", "enabled", False, boolean=True):
1069            accountfile = self.config.get_config("sftpd", "accounts.file", None)
1070            if accountfile:
1071                accountfile = self.config.get_config_path(accountfile)
1072            sftp_portstr = self.config.get_config("sftpd", "port", "tcp:8022")
1073            pubkey_file = self.config.get_config("sftpd", "host_pubkey_file")
1074            privkey_file = self.config.get_config("sftpd", "host_privkey_file")
1075
1076            from allmydata.frontends import sftpd
1077            s = sftpd.SFTPServer(self, accountfile,
1078                                 sftp_portstr, pubkey_file, privkey_file)
1079            s.setServiceParent(self)
1080
1081    def _check_exit_trigger(self, exit_trigger_file):
1082        if os.path.exists(exit_trigger_file):
1083            mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
1084            if mtime > time.time() - 120.0:
1085                return
1086            else:
1087                self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
1088        else:
1089            self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
1090        reactor.stop()
1091
1092    def get_encoding_parameters(self):
1093        return self.encoding_params
1094
1095    def introducer_connection_statuses(self):
1096        return [ic.connection_status() for ic in self.introducer_clients]
1097
1098    def connected_to_introducer(self):
1099        return any([ic.connected_to_introducer() for ic in self.introducer_clients])
1100
1101    def get_renewal_secret(self): # this will go away
1102        return self._secret_holder.get_renewal_secret()
1103
1104    def get_cancel_secret(self):
1105        return self._secret_holder.get_cancel_secret()
1106
1107    def debug_wait_for_client_connections(self, num_clients):
1108        """Return a Deferred that fires (with None) when we have connections
1109        to the given number of peers. Useful for tests that set up a
1110        temporary test network and need to know when it is safe to proceed
1111        with an upload or download."""
1112        def _check():
1113            return len(self.storage_broker.get_connected_servers()) >= num_clients
1114        d = self.poll(_check, 0.5)
1115        d.addCallback(lambda res: None)
1116        return d
1117
1118
1119    # these four methods are the primitives for creating filenodes and
1120    # dirnodes. The first takes a URI and produces a filenode or (new-style)
1121    # dirnode. The other three create brand-new filenodes/dirnodes.
1122
1123    def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
1124        # This returns synchronously.
1125        # Note that it does *not* validate the write_uri and read_uri; instead we
1126        # may get an opaque node if there were any problems.
1127        return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
1128
1129    def create_dirnode(
1130        self,
1131        initial_children: dict | None = None,
1132        version: int | None = None,
1133        *,
1134        unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None
1135    ) -> DirectoryNode:
1136        """
1137        Create a new directory.
1138
1139        :param initial_children: If given, a structured dict representing the
1140            initial content of the created directory. See
1141            `docs/frontends/webapi.rst` for examples.
1142
1143        :param version: If given, an int representing the mutable file format
1144            of the new object. Acceptable values are currently `SDMF_VERSION`
1145            or `MDMF_VERSION` (corresponding to 0 or 1, respectively, as
1146            defined in `allmydata.interfaces`). If no such value is provided,
1147            the default mutable format will be used (currently SDMF).
1148
1149        :param unique_keypair: an optional tuple containing the RSA public
1150            and private key to be used for the new directory. Typically, this
1151            value is omitted (in which case a new random keypair will be
1152            generated at creation time).
1153
1154            **Warning** This value independently determines the identity of
1155            the mutable object to create.  There cannot be two different
1156            mutable objects that share a keypair.  They will merge into one
1157            object (with undefined contents).
1158
1159        :return: A Deferred which will fire with a representation of the new
1160            directory after it has been created.
1161        """
1162        d = self.nodemaker.create_new_mutable_directory(
1163            initial_children,
1164            version=version,
1165            keypair=unique_keypair,
1166        )
1167        return d
1168
1169    def create_immutable_dirnode(self, children, convergence=None):
1170        return self.nodemaker.create_immutable_directory(children, convergence)
1171
1172    def create_mutable_file(
1173            self,
1174            contents: bytes | None = None,
1175            version: int | None = None,
1176            *,
1177            unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None,
1178    ) -> MutableFileNode:
1179        """
1180        Create *and upload* a new mutable object.
1181
1182        :param contents: If given, the initial contents for the new object.
1183
1184        :param version: If given, the mutable file format for the new object
1185            (otherwise a format will be chosen automatically).
1186
1187        :param unique_keypair: **Warning** This value independently determines
1188            the identity of the mutable object to create.  There cannot be two
1189            different mutable objects that share a keypair.  They will merge
1190            into one object (with undefined contents).
1191
1192            It is common to pass a None value (or not pass a valuye) for this
1193            parameter.  In these cases, a new random keypair will be
1194            generated.
1195
1196            If non-None, the given public/private keypair will be used for the
1197            new object.  The expected use-case is for implementing compliance
1198            tests.
1199
1200        :return: A Deferred which will fire with a representation of the new
1201            mutable object after it has been uploaded.
1202        """
1203        return self.nodemaker.create_mutable_file(contents,
1204                                                  version=version,
1205                                                  keypair=unique_keypair)
1206
1207    def upload(self, uploadable, reactor=None):
1208        uploader = self.getServiceNamed("uploader")
1209        return uploader.upload(uploadable, reactor=reactor)
Note: See TracBrowser for help on using the repository browser.