1 | """ |
---|
2 | Functionality related to operating a Tahoe-LAFS node (client _or_ server). |
---|
3 | """ |
---|
4 | from __future__ import annotations |
---|
5 | |
---|
6 | import os |
---|
7 | import stat |
---|
8 | import time |
---|
9 | import weakref |
---|
10 | from typing import Optional, Iterable |
---|
11 | from base64 import urlsafe_b64encode |
---|
12 | from functools import partial |
---|
13 | from configparser import NoSectionError |
---|
14 | |
---|
15 | from six import ensure_text |
---|
16 | from foolscap.furl import ( |
---|
17 | decode_furl, |
---|
18 | ) |
---|
19 | |
---|
20 | import attr |
---|
21 | from zope.interface import implementer |
---|
22 | |
---|
23 | from twisted.plugin import ( |
---|
24 | getPlugins, |
---|
25 | ) |
---|
26 | from twisted.internet import reactor, defer |
---|
27 | from twisted.application import service |
---|
28 | from twisted.application.internet import TimerService |
---|
29 | from twisted.python.filepath import FilePath |
---|
30 | |
---|
31 | import allmydata |
---|
32 | from allmydata import node |
---|
33 | from allmydata.crypto import rsa, ed25519 |
---|
34 | from allmydata.crypto.util import remove_prefix |
---|
35 | from allmydata.dirnode import DirectoryNode |
---|
36 | from allmydata.storage.server import StorageServer, FoolscapStorageServer |
---|
37 | from allmydata import storage_client |
---|
38 | from allmydata.immutable.upload import Uploader |
---|
39 | from allmydata.immutable.offloaded import Helper |
---|
40 | from allmydata.mutable.filenode import MutableFileNode |
---|
41 | from allmydata.introducer.client import IntroducerClient |
---|
42 | from allmydata.util import ( |
---|
43 | hashutil, base32, pollmixin, log, idlib, |
---|
44 | yamlutil, configutil, |
---|
45 | fileutil, |
---|
46 | ) |
---|
47 | from allmydata.util.encodingutil import get_filesystem_encoding |
---|
48 | from allmydata.util.abbreviate import parse_abbreviated_size |
---|
49 | from allmydata.util.time_format import parse_duration, parse_date |
---|
50 | from allmydata.util.i2p_provider import create as create_i2p_provider |
---|
51 | from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider |
---|
52 | from allmydata.util.cputhreadpool import defer_to_thread |
---|
53 | from allmydata.util.deferredutil import async_to_deferred |
---|
54 | from allmydata.stats import StatsProvider |
---|
55 | from allmydata.history import History |
---|
56 | from allmydata.interfaces import ( |
---|
57 | IStatsProducer, |
---|
58 | SDMF_VERSION, |
---|
59 | MDMF_VERSION, |
---|
60 | DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, |
---|
61 | IFoolscapStoragePlugin, |
---|
62 | IAnnounceableStorageServer, |
---|
63 | ) |
---|
64 | from allmydata.nodemaker import NodeMaker |
---|
65 | from allmydata.blacklist import Blacklist |
---|
66 | from allmydata.node import _Config |
---|
67 | |
---|
68 | KiB=1024 |
---|
69 | MiB=1024*KiB |
---|
70 | GiB=1024*MiB |
---|
71 | TiB=1024*GiB |
---|
72 | PiB=1024*TiB |
---|
73 | |
---|
74 | def _is_valid_section(section_name): |
---|
75 | """ |
---|
76 | Check for valid dynamic configuration section names. |
---|
77 | |
---|
78 | Currently considers all possible storage server plugin sections valid. |
---|
79 | """ |
---|
80 | return ( |
---|
81 | section_name.startswith("storageserver.plugins.") or |
---|
82 | section_name.startswith("storageclient.plugins.") or |
---|
83 | section_name in ("grid_managers", "grid_manager_certificates") |
---|
84 | ) |
---|
85 | |
---|
86 | |
---|
87 | _client_config = configutil.ValidConfiguration( |
---|
88 | static_valid_sections={ |
---|
89 | "client": ( |
---|
90 | "helper.furl", |
---|
91 | "introducer.furl", |
---|
92 | "key_generator.furl", |
---|
93 | "mutable.format", |
---|
94 | "peers.preferred", |
---|
95 | "shares.happy", |
---|
96 | "shares.needed", |
---|
97 | "shares.total", |
---|
98 | "shares._max_immutable_segment_size_for_testing", |
---|
99 | "storage.plugins", |
---|
100 | "force_foolscap", |
---|
101 | ), |
---|
102 | "storage": ( |
---|
103 | "debug_discard", |
---|
104 | "enabled", |
---|
105 | "anonymous", |
---|
106 | "expire.cutoff_date", |
---|
107 | "expire.enabled", |
---|
108 | "expire.immutable", |
---|
109 | "expire.mode", |
---|
110 | "expire.mode", |
---|
111 | "expire.mutable", |
---|
112 | "expire.override_lease_duration", |
---|
113 | "readonly", |
---|
114 | "reserved_space", |
---|
115 | "storage_dir", |
---|
116 | "plugins", |
---|
117 | "grid_management", |
---|
118 | "force_foolscap", |
---|
119 | ), |
---|
120 | "sftpd": ( |
---|
121 | "accounts.file", |
---|
122 | "enabled", |
---|
123 | "host_privkey_file", |
---|
124 | "host_pubkey_file", |
---|
125 | "port", |
---|
126 | ), |
---|
127 | "helper": ( |
---|
128 | "enabled", |
---|
129 | ), |
---|
130 | }, |
---|
131 | is_valid_section=_is_valid_section, |
---|
132 | # Anything in a valid section is a valid item, for now. |
---|
133 | is_valid_item=lambda section, ignored: _is_valid_section(section), |
---|
134 | ) |
---|
135 | |
---|
136 | |
---|
137 | def _valid_config(): |
---|
138 | cfg = node._common_valid_config() |
---|
139 | return cfg.update(_client_config) |
---|
140 | |
---|
141 | # this is put into README in new node-directories |
---|
142 | CLIENT_README = u""" |
---|
143 | This directory contains files which contain private data for the Tahoe node, |
---|
144 | such as private keys. On Unix-like systems, the permissions on this directory |
---|
145 | are set to disallow users other than its owner from reading the contents of |
---|
146 | the files. See the 'configuration.rst' documentation file for details. |
---|
147 | """ |
---|
148 | |
---|
149 | |
---|
150 | |
---|
151 | def _make_secret(): |
---|
152 | """ |
---|
153 | Returns a base32-encoded random secret of hashutil.CRYPTO_VAL_SIZE |
---|
154 | bytes. |
---|
155 | """ |
---|
156 | return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + b"\n" |
---|
157 | |
---|
158 | |
---|
159 | class SecretHolder(object): |
---|
160 | def __init__(self, lease_secret, convergence_secret): |
---|
161 | self._lease_secret = lease_secret |
---|
162 | self._convergence_secret = convergence_secret |
---|
163 | |
---|
164 | def get_renewal_secret(self): |
---|
165 | return hashutil.my_renewal_secret_hash(self._lease_secret) |
---|
166 | |
---|
167 | def get_cancel_secret(self): |
---|
168 | return hashutil.my_cancel_secret_hash(self._lease_secret) |
---|
169 | |
---|
170 | def get_convergence_secret(self): |
---|
171 | return self._convergence_secret |
---|
172 | |
---|
173 | class KeyGenerator(object): |
---|
174 | """I create RSA keys for mutable files. Each call to generate() returns a |
---|
175 | single keypair.""" |
---|
176 | |
---|
177 | @async_to_deferred |
---|
178 | async def generate(self) -> tuple[rsa.PublicKey, rsa.PrivateKey]: |
---|
179 | """ |
---|
180 | I return a Deferred that fires with a (verifyingkey, signingkey) |
---|
181 | pair. The returned key will be 2048 bit. |
---|
182 | """ |
---|
183 | keysize = 2048 |
---|
184 | private, public = await defer_to_thread( |
---|
185 | rsa.create_signing_keypair, keysize |
---|
186 | ) |
---|
187 | return public, private |
---|
188 | |
---|
189 | |
---|
190 | class Terminator(service.Service): |
---|
191 | def __init__(self): |
---|
192 | self._clients = weakref.WeakKeyDictionary() |
---|
193 | def register(self, c): |
---|
194 | self._clients[c] = None |
---|
195 | def stopService(self): |
---|
196 | for c in self._clients: |
---|
197 | c.stop() |
---|
198 | return service.Service.stopService(self) |
---|
199 | |
---|
200 | |
---|
201 | def read_config(basedir, portnumfile, generated_files: Iterable=()): |
---|
202 | """ |
---|
203 | Read and validate configuration for a client-style Node. See |
---|
204 | :method:`allmydata.node.read_config` for parameter meanings (the |
---|
205 | only difference here is we pass different validation data) |
---|
206 | |
---|
207 | :returns: :class:`allmydata.node._Config` instance |
---|
208 | """ |
---|
209 | return node.read_config( |
---|
210 | basedir, portnumfile, |
---|
211 | generated_files=generated_files, |
---|
212 | _valid_config=_valid_config(), |
---|
213 | ) |
---|
214 | |
---|
215 | |
---|
216 | config_from_string = partial( |
---|
217 | node.config_from_string, |
---|
218 | _valid_config=_valid_config(), |
---|
219 | ) |
---|
220 | |
---|
221 | |
---|
222 | def create_client(basedir=u".", _client_factory=None): |
---|
223 | """ |
---|
224 | Creates a new client instance (a subclass of Node). |
---|
225 | |
---|
226 | :param unicode basedir: the node directory (which may not exist yet) |
---|
227 | |
---|
228 | :param _client_factory: (for testing) a callable that returns an |
---|
229 | instance of :class:`allmydata.node.Node` (or a subclass). By default |
---|
230 | this is :class:`allmydata.client._Client` |
---|
231 | |
---|
232 | :returns: Deferred yielding an instance of :class:`allmydata.client._Client` |
---|
233 | """ |
---|
234 | try: |
---|
235 | node.create_node_dir(basedir, CLIENT_README) |
---|
236 | config = read_config(basedir, u"client.port") |
---|
237 | # following call is async |
---|
238 | return create_client_from_config( |
---|
239 | config, |
---|
240 | _client_factory=_client_factory, |
---|
241 | ) |
---|
242 | except Exception: |
---|
243 | return defer.fail() |
---|
244 | |
---|
245 | |
---|
246 | @defer.inlineCallbacks |
---|
247 | def create_client_from_config(config, _client_factory=None, _introducer_factory=None): |
---|
248 | """ |
---|
249 | Creates a new client instance (a subclass of Node). Most code |
---|
250 | should probably use `create_client` instead. |
---|
251 | |
---|
252 | :returns: Deferred yielding a _Client instance |
---|
253 | |
---|
254 | :param config: configuration instance (from read_config()) which |
---|
255 | encapsulates everything in the "node directory". |
---|
256 | |
---|
257 | :param _client_factory: for testing; the class to instantiate |
---|
258 | instead of _Client |
---|
259 | |
---|
260 | :param _introducer_factory: for testing; the class to instantiate instead |
---|
261 | of IntroducerClient |
---|
262 | """ |
---|
263 | if _client_factory is None: |
---|
264 | _client_factory = _Client |
---|
265 | |
---|
266 | i2p_provider = create_i2p_provider(reactor, config) |
---|
267 | tor_provider = create_tor_provider(reactor, config) |
---|
268 | handlers = node.create_connection_handlers(config, i2p_provider, tor_provider) |
---|
269 | default_connection_handlers, foolscap_connection_handlers = handlers |
---|
270 | tub_options = node.create_tub_options(config) |
---|
271 | |
---|
272 | main_tub = node.create_main_tub( |
---|
273 | config, tub_options, default_connection_handlers, |
---|
274 | foolscap_connection_handlers, i2p_provider, tor_provider, |
---|
275 | ) |
---|
276 | |
---|
277 | introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory) |
---|
278 | storage_broker = create_storage_farm_broker( |
---|
279 | config, default_connection_handlers, foolscap_connection_handlers, |
---|
280 | tub_options, introducer_clients, tor_provider |
---|
281 | ) |
---|
282 | |
---|
283 | client = _client_factory( |
---|
284 | config, |
---|
285 | main_tub, |
---|
286 | i2p_provider, |
---|
287 | tor_provider, |
---|
288 | introducer_clients, |
---|
289 | storage_broker, |
---|
290 | ) |
---|
291 | |
---|
292 | # Initialize storage separately after creating the client. This is |
---|
293 | # necessary because we need to pass a reference to the client in to the |
---|
294 | # storage plugins to allow them to initialize themselves (specifically, |
---|
295 | # they may want the anonymous IStorageServer implementation so they don't |
---|
296 | # have to duplicate all of its basic storage functionality). A better way |
---|
297 | # to do this, eventually, may be to create that implementation first and |
---|
298 | # then pass it in to both storage plugin creation and the client factory. |
---|
299 | # This avoids making a partially initialized client object escape the |
---|
300 | # client factory and removes the circular dependency between these |
---|
301 | # objects. |
---|
302 | storage_plugins = yield _StoragePlugins.from_config( |
---|
303 | client.get_anonymous_storage_server, |
---|
304 | config, |
---|
305 | ) |
---|
306 | client.init_storage(storage_plugins.announceable_storage_servers) |
---|
307 | |
---|
308 | i2p_provider.setServiceParent(client) |
---|
309 | tor_provider.setServiceParent(client) |
---|
310 | for ic in introducer_clients: |
---|
311 | ic.setServiceParent(client) |
---|
312 | storage_broker.setServiceParent(client) |
---|
313 | defer.returnValue(client) |
---|
314 | |
---|
315 | |
---|
316 | @attr.s |
---|
317 | class _StoragePlugins(object): |
---|
318 | """ |
---|
319 | Functionality related to getting storage plugins set up and ready for use. |
---|
320 | |
---|
321 | :ivar list[IAnnounceableStorageServer] announceable_storage_servers: The |
---|
322 | announceable storage servers that should be used according to node |
---|
323 | configuration. |
---|
324 | """ |
---|
325 | announceable_storage_servers = attr.ib() |
---|
326 | |
---|
327 | @classmethod |
---|
328 | @defer.inlineCallbacks |
---|
329 | def from_config(cls, get_anonymous_storage_server, config): |
---|
330 | """ |
---|
331 | Load and configured storage plugins. |
---|
332 | |
---|
333 | :param get_anonymous_storage_server: A no-argument callable which |
---|
334 | returns the node's anonymous ``IStorageServer`` implementation. |
---|
335 | |
---|
336 | :param _Config config: The node's configuration. |
---|
337 | |
---|
338 | :return: A ``_StoragePlugins`` initialized from the given |
---|
339 | configuration. |
---|
340 | """ |
---|
341 | storage_plugin_names = cls._get_enabled_storage_plugin_names(config) |
---|
342 | plugins = list(cls._collect_storage_plugins(storage_plugin_names)) |
---|
343 | unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins} |
---|
344 | if unknown_plugin_names: |
---|
345 | raise configutil.UnknownConfigError( |
---|
346 | "Storage plugins {} are enabled but not known on this system.".format( |
---|
347 | unknown_plugin_names, |
---|
348 | ), |
---|
349 | ) |
---|
350 | announceable_storage_servers = yield cls._create_plugin_storage_servers( |
---|
351 | get_anonymous_storage_server, |
---|
352 | config, |
---|
353 | plugins, |
---|
354 | ) |
---|
355 | defer.returnValue(cls( |
---|
356 | announceable_storage_servers, |
---|
357 | )) |
---|
358 | |
---|
359 | @classmethod |
---|
360 | def _get_enabled_storage_plugin_names(cls, config): |
---|
361 | """ |
---|
362 | Get the names of storage plugins that are enabled in the configuration. |
---|
363 | """ |
---|
364 | return set( |
---|
365 | config.get_config( |
---|
366 | "storage", "plugins", "" |
---|
367 | ).split(u",") |
---|
368 | ) - {u""} |
---|
369 | |
---|
370 | @classmethod |
---|
371 | def _collect_storage_plugins(cls, storage_plugin_names): |
---|
372 | """ |
---|
373 | Get the storage plugins with names matching those given. |
---|
374 | """ |
---|
375 | return list( |
---|
376 | plugin |
---|
377 | for plugin |
---|
378 | in getPlugins(IFoolscapStoragePlugin) |
---|
379 | if plugin.name in storage_plugin_names |
---|
380 | ) |
---|
381 | |
---|
382 | @classmethod |
---|
383 | def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins): |
---|
384 | """ |
---|
385 | Cause each storage plugin to instantiate its storage server and return |
---|
386 | them all. |
---|
387 | |
---|
388 | :return: A ``Deferred`` that fires with storage servers instantiated |
---|
389 | by all of the given storage server plugins. |
---|
390 | """ |
---|
391 | return defer.gatherResults( |
---|
392 | list( |
---|
393 | plugin.get_storage_server( |
---|
394 | cls._get_storage_plugin_configuration(config, plugin.name), |
---|
395 | get_anonymous_storage_server, |
---|
396 | ).addCallback( |
---|
397 | partial( |
---|
398 | _add_to_announcement, |
---|
399 | {u"name": plugin.name}, |
---|
400 | ), |
---|
401 | ) |
---|
402 | for plugin |
---|
403 | # The order is fairly arbitrary and it is not meant to convey |
---|
404 | # anything but providing *some* stable ordering makes the data |
---|
405 | # a little easier to deal with (mainly in tests and when |
---|
406 | # manually inspecting it). |
---|
407 | in sorted(plugins, key=lambda p: p.name) |
---|
408 | ), |
---|
409 | ) |
---|
410 | |
---|
411 | @classmethod |
---|
412 | def _get_storage_plugin_configuration(cls, config, storage_plugin_name): |
---|
413 | """ |
---|
414 | Load the configuration for a storage server plugin with the given name. |
---|
415 | |
---|
416 | :return dict[bytes, bytes]: The matching configuration. |
---|
417 | """ |
---|
418 | try: |
---|
419 | config = config.items( |
---|
420 | "storageserver.plugins." + storage_plugin_name, |
---|
421 | ) |
---|
422 | except NoSectionError: |
---|
423 | config = [] |
---|
424 | return dict(config) |
---|
425 | |
---|
426 | |
---|
427 | |
---|
428 | def _sequencer(config): |
---|
429 | """ |
---|
430 | :returns: a 2-tuple consisting of a new announcement |
---|
431 | sequence-number and random nonce (int, unicode). Reads and |
---|
432 | re-writes configuration file "announcement-seqnum" (starting at 1 |
---|
433 | if that file doesn't exist). |
---|
434 | """ |
---|
435 | seqnum_s = config.get_config_from_file("announcement-seqnum") |
---|
436 | if not seqnum_s: |
---|
437 | seqnum_s = u"0" |
---|
438 | seqnum = int(seqnum_s.strip()) |
---|
439 | seqnum += 1 # increment |
---|
440 | config.write_config_file("announcement-seqnum", "{}\n".format(seqnum)) |
---|
441 | nonce = _make_secret().strip() |
---|
442 | return seqnum, nonce |
---|
443 | |
---|
444 | |
---|
445 | def create_introducer_clients(config, main_tub, _introducer_factory=None): |
---|
446 | """ |
---|
447 | Read, validate and parse any 'introducers.yaml' configuration. |
---|
448 | |
---|
449 | :param _introducer_factory: for testing; the class to instantiate instead |
---|
450 | of IntroducerClient |
---|
451 | |
---|
452 | :returns: a list of IntroducerClient instances |
---|
453 | """ |
---|
454 | if _introducer_factory is None: |
---|
455 | _introducer_factory = IntroducerClient |
---|
456 | |
---|
457 | # we return this list |
---|
458 | introducer_clients = [] |
---|
459 | |
---|
460 | introducers = config.get_introducer_configuration() |
---|
461 | |
---|
462 | for petname, (furl, cache_path) in list(introducers.items()): |
---|
463 | ic = _introducer_factory( |
---|
464 | main_tub, |
---|
465 | furl.encode("ascii"), |
---|
466 | config.nickname, |
---|
467 | str(allmydata.__full_version__), |
---|
468 | str(_Client.OLDEST_SUPPORTED_VERSION), |
---|
469 | partial(_sequencer, config), |
---|
470 | cache_path, |
---|
471 | ) |
---|
472 | introducer_clients.append(ic) |
---|
473 | return introducer_clients |
---|
474 | |
---|
475 | |
---|
476 | def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]): |
---|
477 | """ |
---|
478 | Create a StorageFarmBroker object, for use by Uploader/Downloader |
---|
479 | (and everybody else who wants to use storage servers) |
---|
480 | |
---|
481 | :param config: a _Config instance |
---|
482 | |
---|
483 | :param default_connection_handlers: default Foolscap handlers |
---|
484 | |
---|
485 | :param foolscap_connection_handlers: available/configured Foolscap |
---|
486 | handlers |
---|
487 | |
---|
488 | :param dict tub_options: how to configure our Tub |
---|
489 | |
---|
490 | :param list introducer_clients: IntroducerClient instances if |
---|
491 | we're connecting to any |
---|
492 | """ |
---|
493 | storage_client_config = storage_client.StorageClientConfig.from_node_config( |
---|
494 | config, |
---|
495 | ) |
---|
496 | # ensure that we can at least load all plugins that the |
---|
497 | # configuration mentions; doing this early (i.e. before creating |
---|
498 | # storage-clients themselves) allows us to exit in case of a |
---|
499 | # problem. |
---|
500 | storage_client_config.get_configured_storage_plugins() |
---|
501 | |
---|
502 | def tub_creator(handler_overrides=None, **kwargs): |
---|
503 | return node.create_tub( |
---|
504 | tub_options, |
---|
505 | default_connection_handlers, |
---|
506 | foolscap_connection_handlers, |
---|
507 | handler_overrides={} if handler_overrides is None else handler_overrides, |
---|
508 | **kwargs |
---|
509 | ) |
---|
510 | |
---|
511 | # create the actual storage-broker |
---|
512 | sb = storage_client.StorageFarmBroker( |
---|
513 | permute_peers=True, |
---|
514 | tub_maker=tub_creator, |
---|
515 | node_config=config, |
---|
516 | storage_client_config=storage_client_config, |
---|
517 | default_connection_handlers=default_connection_handlers, |
---|
518 | tor_provider=tor_provider, |
---|
519 | ) |
---|
520 | for ic in introducer_clients: |
---|
521 | sb.use_introducer(ic) |
---|
522 | return sb |
---|
523 | |
---|
524 | |
---|
525 | def _register_reference(key, config, tub, referenceable): |
---|
526 | """ |
---|
527 | Register a referenceable in a tub with a stable fURL. |
---|
528 | |
---|
529 | Stability is achieved by storing the fURL in the configuration the first |
---|
530 | time and then reading it back on for future calls. |
---|
531 | |
---|
532 | :param bytes key: An identifier for this reference which can be used to |
---|
533 | identify its fURL in the configuration. |
---|
534 | |
---|
535 | :param _Config config: The configuration to use for fURL persistence. |
---|
536 | |
---|
537 | :param Tub tub: The tub in which to register the reference. |
---|
538 | |
---|
539 | :param Referenceable referenceable: The referenceable to register in the |
---|
540 | Tub. |
---|
541 | |
---|
542 | :return bytes: The fURL at which the object is registered. |
---|
543 | """ |
---|
544 | persisted_furl = config.get_private_config( |
---|
545 | key, |
---|
546 | default=None, |
---|
547 | ) |
---|
548 | name = None |
---|
549 | if persisted_furl is not None: |
---|
550 | _, _, name = decode_furl(persisted_furl) |
---|
551 | registered_furl = tub.registerReference( |
---|
552 | referenceable, |
---|
553 | name=name, |
---|
554 | ) |
---|
555 | if persisted_furl is None: |
---|
556 | config.write_private_config(key, registered_furl) |
---|
557 | return registered_furl |
---|
558 | |
---|
559 | |
---|
560 | @implementer(IAnnounceableStorageServer) |
---|
561 | @attr.s |
---|
562 | class AnnounceableStorageServer(object): |
---|
563 | announcement = attr.ib() |
---|
564 | storage_server = attr.ib() |
---|
565 | |
---|
566 | |
---|
567 | |
---|
568 | def _add_to_announcement(information, announceable_storage_server): |
---|
569 | """ |
---|
570 | Create a new ``AnnounceableStorageServer`` based on |
---|
571 | ``announceable_storage_server`` with ``information`` added to its |
---|
572 | ``announcement``. |
---|
573 | """ |
---|
574 | updated_announcement = announceable_storage_server.announcement.copy() |
---|
575 | updated_announcement.update(information) |
---|
576 | return AnnounceableStorageServer( |
---|
577 | updated_announcement, |
---|
578 | announceable_storage_server.storage_server, |
---|
579 | ) |
---|
580 | |
---|
581 | |
---|
582 | def storage_enabled(config): |
---|
583 | """ |
---|
584 | Is storage enabled according to the given configuration object? |
---|
585 | |
---|
586 | :param _Config config: The configuration to inspect. |
---|
587 | |
---|
588 | :return bool: ``True`` if storage is enabled, ``False`` otherwise. |
---|
589 | """ |
---|
590 | return config.get_config("storage", "enabled", True, boolean=True) |
---|
591 | |
---|
592 | |
---|
593 | def anonymous_storage_enabled(config): |
---|
594 | """ |
---|
595 | Is anonymous access to storage enabled according to the given |
---|
596 | configuration object? |
---|
597 | |
---|
598 | :param _Config config: The configuration to inspect. |
---|
599 | |
---|
600 | :return bool: ``True`` if storage is enabled, ``False`` otherwise. |
---|
601 | """ |
---|
602 | return ( |
---|
603 | storage_enabled(config) and |
---|
604 | config.get_config("storage", "anonymous", True, boolean=True) |
---|
605 | ) |
---|
606 | |
---|
607 | |
---|
608 | @implementer(IStatsProducer) |
---|
609 | class _Client(node.Node, pollmixin.PollMixin): |
---|
610 | """ |
---|
611 | This class should be refactored; see |
---|
612 | https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 |
---|
613 | """ |
---|
614 | |
---|
615 | STOREDIR = 'storage' |
---|
616 | NODETYPE = "client" |
---|
617 | EXIT_TRIGGER_FILE = "exit_trigger" |
---|
618 | |
---|
619 | # This means that if a storage server treats me as though I were a |
---|
620 | # 1.0.0 storage client, it will work as they expect. |
---|
621 | OLDEST_SUPPORTED_VERSION = "1.0.0" |
---|
622 | |
---|
623 | # This is a dictionary of (needed, desired, total, max_segment_size). 'needed' |
---|
624 | # is the number of shares required to reconstruct a file. 'desired' means |
---|
625 | # that we will abort an upload unless we can allocate space for at least |
---|
626 | # this many. 'total' is the total number of shares created by encoding. |
---|
627 | # If everybody has room then this is is how many we will upload. |
---|
628 | DEFAULT_ENCODING_PARAMETERS = {"k": 3, |
---|
629 | "happy": 7, |
---|
630 | "n": 10, |
---|
631 | "max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, |
---|
632 | } |
---|
633 | |
---|
634 | def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients, |
---|
635 | storage_farm_broker): |
---|
636 | """ |
---|
637 | Use :func:`allmydata.client.create_client` to instantiate one of these. |
---|
638 | """ |
---|
639 | node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider) |
---|
640 | |
---|
641 | self.started_timestamp = time.time() |
---|
642 | self.logSource = "Client" |
---|
643 | self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() |
---|
644 | |
---|
645 | self.introducer_clients = introducer_clients |
---|
646 | self.storage_broker = storage_farm_broker |
---|
647 | |
---|
648 | self.init_stats_provider() |
---|
649 | self.init_secrets() |
---|
650 | self.init_node_key() |
---|
651 | self._key_generator = KeyGenerator() |
---|
652 | key_gen_furl = config.get_config("client", "key_generator.furl", None) |
---|
653 | if key_gen_furl: |
---|
654 | log.msg("[client]key_generator.furl= is now ignored, see #2783") |
---|
655 | self.init_client() |
---|
656 | self.load_static_servers() |
---|
657 | self.helper = None |
---|
658 | if config.get_config("helper", "enabled", False, boolean=True): |
---|
659 | if not self._is_tub_listening(): |
---|
660 | raise ValueError("config error: helper is enabled, but tub " |
---|
661 | "is not listening ('tub.port=' is empty)") |
---|
662 | self.init_helper() |
---|
663 | self.init_sftp_server() |
---|
664 | |
---|
665 | # If the node sees an exit_trigger file, it will poll every second to see |
---|
666 | # whether the file still exists, and what its mtime is. If the file does not |
---|
667 | # exist or has not been modified for a given timeout, the node will exit. |
---|
668 | exit_trigger_file = config.get_config_path(self.EXIT_TRIGGER_FILE) |
---|
669 | if os.path.exists(exit_trigger_file): |
---|
670 | age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME] |
---|
671 | self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age)) |
---|
672 | exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file) |
---|
673 | exit_trigger.setServiceParent(self) |
---|
674 | |
---|
675 | # this needs to happen last, so it can use getServiceNamed() to |
---|
676 | # acquire references to StorageServer and other web-statusable things |
---|
677 | webport = config.get_config("node", "web.port", None) |
---|
678 | if webport: |
---|
679 | self.init_web(webport) # strports string |
---|
680 | |
---|
681 | # TODO this may be the wrong location for now? but as temporary measure |
---|
682 | # it allows us to get NURLs for testing in test_istorageserver.py. This |
---|
683 | # will eventually get fixed one way or another in |
---|
684 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3901. See also |
---|
685 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 for the bigger |
---|
686 | # picture issue. |
---|
687 | self.storage_nurls : Optional[set] = None |
---|
688 | |
---|
689 | def init_stats_provider(self): |
---|
690 | self.stats_provider = StatsProvider(self) |
---|
691 | self.stats_provider.setServiceParent(self) |
---|
692 | self.stats_provider.register_producer(self) |
---|
693 | |
---|
694 | def get_stats(self): |
---|
695 | return { 'node.uptime': time.time() - self.started_timestamp } |
---|
696 | |
---|
697 | def init_secrets(self): |
---|
698 | # configs are always unicode |
---|
699 | def _unicode_make_secret(): |
---|
700 | return str(_make_secret(), "ascii") |
---|
701 | lease_s = self.config.get_or_create_private_config( |
---|
702 | "secret", _unicode_make_secret).encode("utf-8") |
---|
703 | lease_secret = base32.a2b(lease_s) |
---|
704 | convergence_s = self.config.get_or_create_private_config( |
---|
705 | 'convergence', _unicode_make_secret).encode("utf-8") |
---|
706 | self.convergence = base32.a2b(convergence_s) |
---|
707 | self._secret_holder = SecretHolder(lease_secret, self.convergence) |
---|
708 | |
---|
709 | def init_node_key(self): |
---|
710 | # we only create the key once. On all subsequent runs, we re-use the |
---|
711 | # existing key |
---|
712 | def _make_key(): |
---|
713 | private_key, _ = ed25519.create_signing_keypair() |
---|
714 | # Config values are always unicode: |
---|
715 | return str(ed25519.string_from_signing_key(private_key) + b"\n", "utf-8") |
---|
716 | |
---|
717 | private_key_str = self.config.get_or_create_private_config( |
---|
718 | "node.privkey", _make_key).encode("utf-8") |
---|
719 | private_key, public_key = ed25519.signing_keypair_from_string(private_key_str) |
---|
720 | public_key_str = ed25519.string_from_verifying_key(public_key) |
---|
721 | self.config.write_config_file("node.pubkey", public_key_str + b"\n", "wb") |
---|
722 | self._node_private_key = private_key |
---|
723 | self._node_public_key = public_key |
---|
724 | |
---|
725 | def get_long_nodeid(self): |
---|
726 | # this matches what IServer.get_longname() says about us elsewhere |
---|
727 | vk_string = ed25519.string_from_verifying_key(self._node_public_key) |
---|
728 | return remove_prefix(vk_string, b"pub-") |
---|
729 | |
---|
730 | def get_long_tubid(self): |
---|
731 | return idlib.nodeid_b2a(self.nodeid) |
---|
732 | |
---|
733 | def get_web_service(self): |
---|
734 | """ |
---|
735 | :return: a reference to our web server |
---|
736 | """ |
---|
737 | return self.getServiceNamed("webish") |
---|
738 | |
---|
739 | def _init_permutation_seed(self, ss): |
---|
740 | seed = self.config.get_config_from_file("permutation-seed") |
---|
741 | if not seed: |
---|
742 | have_shares = ss.have_shares() |
---|
743 | if have_shares: |
---|
744 | # if the server has shares but not a recorded |
---|
745 | # permutation-seed, then it has been around since pre-#466 |
---|
746 | # days, and the clients who uploaded those shares used our |
---|
747 | # TubID as a permutation-seed. We should keep using that same |
---|
748 | # seed to keep the shares in the same place in the permuted |
---|
749 | # ring, so those clients don't have to perform excessive |
---|
750 | # searches. |
---|
751 | seed = base32.b2a(self.nodeid) |
---|
752 | else: |
---|
753 | # otherwise, we're free to use the more natural seed of our |
---|
754 | # pubkey-based serverid |
---|
755 | vk_string = ed25519.string_from_verifying_key(self._node_public_key) |
---|
756 | vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX) |
---|
757 | seed = base32.b2a(vk_bytes) |
---|
758 | self.config.write_config_file("permutation-seed", seed+b"\n", mode="wb") |
---|
759 | return seed.strip() |
---|
760 | |
---|
761 | def get_anonymous_storage_server(self): |
---|
762 | """ |
---|
763 | Get the anonymous ``IStorageServer`` implementation for this node. |
---|
764 | |
---|
765 | Note this will return an object even if storage is disabled on this |
---|
766 | node (but the object will not be exposed, peers will not be able to |
---|
767 | access it, and storage will remain disabled). |
---|
768 | |
---|
769 | The one and only instance for this node is always returned. It is |
---|
770 | created first if necessary. |
---|
771 | """ |
---|
772 | try: |
---|
773 | ss = self.getServiceNamed(StorageServer.name) |
---|
774 | except KeyError: |
---|
775 | pass |
---|
776 | else: |
---|
777 | return ss |
---|
778 | |
---|
779 | readonly = self.config.get_config("storage", "readonly", False, boolean=True) |
---|
780 | |
---|
781 | config_storedir = self.get_config( |
---|
782 | "storage", "storage_dir", self.STOREDIR, |
---|
783 | ) |
---|
784 | storedir = self.config.get_config_path(config_storedir) |
---|
785 | |
---|
786 | data = self.config.get_config("storage", "reserved_space", None) |
---|
787 | try: |
---|
788 | reserved = parse_abbreviated_size(data) |
---|
789 | except ValueError: |
---|
790 | log.msg("[storage]reserved_space= contains unparseable value %s" |
---|
791 | % data) |
---|
792 | raise |
---|
793 | if reserved is None: |
---|
794 | reserved = 0 |
---|
795 | discard = self.config.get_config("storage", "debug_discard", False, |
---|
796 | boolean=True) |
---|
797 | |
---|
798 | expire = self.config.get_config("storage", "expire.enabled", False, boolean=True) |
---|
799 | if expire: |
---|
800 | mode = self.config.get_config("storage", "expire.mode") # require a mode |
---|
801 | else: |
---|
802 | mode = self.config.get_config("storage", "expire.mode", "age") |
---|
803 | |
---|
804 | o_l_d = self.config.get_config("storage", "expire.override_lease_duration", None) |
---|
805 | if o_l_d is not None: |
---|
806 | o_l_d = parse_duration(o_l_d) |
---|
807 | |
---|
808 | cutoff_date = None |
---|
809 | if mode == "cutoff-date": |
---|
810 | cutoff_date = self.config.get_config("storage", "expire.cutoff_date") |
---|
811 | cutoff_date = parse_date(cutoff_date) |
---|
812 | |
---|
813 | sharetypes = [] |
---|
814 | if self.config.get_config("storage", "expire.immutable", True, boolean=True): |
---|
815 | sharetypes.append("immutable") |
---|
816 | if self.config.get_config("storage", "expire.mutable", True, boolean=True): |
---|
817 | sharetypes.append("mutable") |
---|
818 | expiration_sharetypes = tuple(sharetypes) |
---|
819 | |
---|
820 | ss = StorageServer( |
---|
821 | storedir, self.nodeid, |
---|
822 | reserved_space=reserved, |
---|
823 | discard_storage=discard, |
---|
824 | readonly_storage=readonly, |
---|
825 | stats_provider=self.stats_provider, |
---|
826 | expiration_enabled=expire, |
---|
827 | expiration_mode=mode, |
---|
828 | expiration_override_lease_duration=o_l_d, |
---|
829 | expiration_cutoff_date=cutoff_date, |
---|
830 | expiration_sharetypes=expiration_sharetypes, |
---|
831 | ) |
---|
832 | ss.setServiceParent(self) |
---|
833 | return ss |
---|
834 | |
---|
835 | def init_storage(self, announceable_storage_servers): |
---|
836 | # should we run a storage server (and publish it for others to use)? |
---|
837 | if not storage_enabled(self.config): |
---|
838 | return |
---|
839 | if not self._is_tub_listening(): |
---|
840 | raise ValueError("config error: storage is enabled, but tub " |
---|
841 | "is not listening ('tub.port=' is empty)") |
---|
842 | |
---|
843 | ss = self.get_anonymous_storage_server() |
---|
844 | announcement = { |
---|
845 | "permutation-seed-base32": self._init_permutation_seed(ss), |
---|
846 | } |
---|
847 | |
---|
848 | if anonymous_storage_enabled(self.config): |
---|
849 | furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) |
---|
850 | furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) |
---|
851 | (_, _, swissnum) = decode_furl(furl) |
---|
852 | if hasattr(self.tub.negotiationClass, "add_storage_server"): |
---|
853 | nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii")) |
---|
854 | self.storage_nurls = nurls |
---|
855 | # There is code in e.g. storage_client.py that checks if an |
---|
856 | # announcement has changed. Since NURL order isn't meaningful, |
---|
857 | # we don't want a change in the order to count as a change, so we |
---|
858 | # send the NURLs as a set. CBOR supports sets, as does Foolscap. |
---|
859 | announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = {n.to_text() for n in nurls} |
---|
860 | announcement["anonymous-storage-FURL"] = furl |
---|
861 | |
---|
862 | enabled_storage_servers = self._enable_storage_servers( |
---|
863 | announceable_storage_servers, |
---|
864 | ) |
---|
865 | storage_options = list( |
---|
866 | storage_server.announcement |
---|
867 | for storage_server |
---|
868 | in enabled_storage_servers |
---|
869 | ) |
---|
870 | plugins_announcement = {} |
---|
871 | if storage_options: |
---|
872 | # Only add the new key if there are any plugins enabled. |
---|
873 | plugins_announcement[u"storage-options"] = storage_options |
---|
874 | |
---|
875 | announcement.update(plugins_announcement) |
---|
876 | |
---|
877 | if self.config.get_config("storage", "grid_management", default=False, boolean=True): |
---|
878 | grid_manager_certificates = self.config.get_grid_manager_certificates() |
---|
879 | announcement[u"grid-manager-certificates"] = grid_manager_certificates |
---|
880 | |
---|
881 | # Note: certificates are not verified for validity here, but |
---|
882 | # that may be useful. See: |
---|
883 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977 |
---|
884 | |
---|
885 | for ic in self.introducer_clients: |
---|
886 | ic.publish("storage", announcement, self._node_private_key) |
---|
887 | |
---|
888 | def get_client_storage_plugin_web_resources(self): |
---|
889 | """ |
---|
890 | Get all of the client-side ``IResource`` implementations provided by |
---|
891 | enabled storage plugins. |
---|
892 | |
---|
893 | :return dict[bytes, IResource provider]: The implementations. |
---|
894 | """ |
---|
895 | return self.storage_broker.get_client_storage_plugin_web_resources( |
---|
896 | self.config, |
---|
897 | ) |
---|
898 | |
---|
899 | def _enable_storage_servers(self, announceable_storage_servers): |
---|
900 | """ |
---|
901 | Register and announce the given storage servers. |
---|
902 | """ |
---|
903 | for announceable in announceable_storage_servers: |
---|
904 | yield self._enable_storage_server(announceable) |
---|
905 | |
---|
906 | def _enable_storage_server(self, announceable_storage_server): |
---|
907 | """ |
---|
908 | Register a storage server. |
---|
909 | """ |
---|
910 | config_key = "storage-plugin.{}.furl".format( |
---|
911 | # Oops, why don't I have a better handle on this value? |
---|
912 | announceable_storage_server.announcement[u"name"], |
---|
913 | ) |
---|
914 | furl = _register_reference( |
---|
915 | config_key, |
---|
916 | self.config, |
---|
917 | self.tub, |
---|
918 | announceable_storage_server.storage_server, |
---|
919 | ) |
---|
920 | announceable_storage_server = _add_to_announcement( |
---|
921 | {u"storage-server-FURL": furl}, |
---|
922 | announceable_storage_server, |
---|
923 | ) |
---|
924 | return announceable_storage_server |
---|
925 | |
---|
926 | def init_client(self): |
---|
927 | helper_furl = self.config.get_config("client", "helper.furl", None) |
---|
928 | if helper_furl in ("None", ""): |
---|
929 | helper_furl = None |
---|
930 | |
---|
931 | DEP = self.encoding_params |
---|
932 | DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"])) |
---|
933 | DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"])) |
---|
934 | DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"])) |
---|
935 | # At the moment this is only used for testing, thus the janky config |
---|
936 | # attribute name. |
---|
937 | DEP["max_segment_size"] = int(self.config.get_config( |
---|
938 | "client", |
---|
939 | "shares._max_immutable_segment_size_for_testing", |
---|
940 | DEP["max_segment_size"]) |
---|
941 | ) |
---|
942 | |
---|
943 | # for the CLI to authenticate to local JSON endpoints |
---|
944 | self._create_auth_token() |
---|
945 | |
---|
946 | self.history = History(self.stats_provider) |
---|
947 | self.terminator = Terminator() |
---|
948 | self.terminator.setServiceParent(self) |
---|
949 | uploader = Uploader( |
---|
950 | helper_furl, |
---|
951 | self.stats_provider, |
---|
952 | self.history, |
---|
953 | ) |
---|
954 | uploader.setServiceParent(self) |
---|
955 | self.init_blacklist() |
---|
956 | self.init_nodemaker() |
---|
957 | |
---|
958 | def get_auth_token(self): |
---|
959 | """ |
---|
960 | This returns a local authentication token, which is just some |
---|
961 | random data in "api_auth_token" which must be echoed to API |
---|
962 | calls. |
---|
963 | """ |
---|
964 | return self.config.get_private_config( |
---|
965 | 'api_auth_token').encode("ascii") |
---|
966 | |
---|
967 | def _create_auth_token(self): |
---|
968 | """ |
---|
969 | Creates new auth-token data written to 'private/api_auth_token'. |
---|
970 | |
---|
971 | This is intentionally re-created every time the node starts. |
---|
972 | """ |
---|
973 | self.config.write_private_config( |
---|
974 | 'api_auth_token', |
---|
975 | urlsafe_b64encode(os.urandom(32)) + b'\n', |
---|
976 | ) |
---|
977 | |
---|
978 | def get_storage_broker(self): |
---|
979 | return self.storage_broker |
---|
980 | |
---|
981 | def load_static_servers(self): |
---|
982 | """ |
---|
983 | Load the servers.yaml file if it exists, and provide the static |
---|
984 | server data to the StorageFarmBroker. |
---|
985 | """ |
---|
986 | fn = self.config.get_private_path("servers.yaml") |
---|
987 | servers_filepath = FilePath(fn) |
---|
988 | try: |
---|
989 | with servers_filepath.open() as f: |
---|
990 | servers_yaml = yamlutil.safe_load(f) |
---|
991 | static_servers = servers_yaml.get("storage", {}) |
---|
992 | log.msg("found %d static servers in private/servers.yaml" % |
---|
993 | len(static_servers)) |
---|
994 | static_servers = { |
---|
995 | ensure_text(key): value for (key, value) in static_servers.items() |
---|
996 | } |
---|
997 | self.storage_broker.set_static_servers(static_servers) |
---|
998 | except EnvironmentError: |
---|
999 | pass |
---|
1000 | |
---|
1001 | def init_blacklist(self): |
---|
1002 | fn = self.config.get_config_path("access.blacklist") |
---|
1003 | self.blacklist = Blacklist(fn) |
---|
1004 | |
---|
1005 | def init_nodemaker(self): |
---|
1006 | default = self.config.get_config("client", "mutable.format", default="SDMF") |
---|
1007 | if default.upper() == "MDMF": |
---|
1008 | self.mutable_file_default = MDMF_VERSION |
---|
1009 | else: |
---|
1010 | self.mutable_file_default = SDMF_VERSION |
---|
1011 | self.nodemaker = NodeMaker(self.storage_broker, |
---|
1012 | self._secret_holder, |
---|
1013 | self.get_history(), |
---|
1014 | self.getServiceNamed("uploader"), |
---|
1015 | self.terminator, |
---|
1016 | self.get_encoding_parameters(), |
---|
1017 | self.mutable_file_default, |
---|
1018 | self._key_generator, |
---|
1019 | self.blacklist) |
---|
1020 | |
---|
1021 | def get_history(self): |
---|
1022 | return self.history |
---|
1023 | |
---|
1024 | def init_helper(self): |
---|
1025 | self.helper = Helper(self.config.get_config_path("helper"), |
---|
1026 | self.storage_broker, self._secret_holder, |
---|
1027 | self.stats_provider, self.history) |
---|
1028 | # TODO: this is confusing. BASEDIR/private/helper.furl is created by |
---|
1029 | # the helper. BASEDIR/helper.furl is consumed by the client who wants |
---|
1030 | # to use the helper. I like having the filename be the same, since |
---|
1031 | # that makes 'cp' work smoothly, but the difference between config |
---|
1032 | # inputs and generated outputs is hard to see. |
---|
1033 | helper_furlfile = self.config.get_private_path("helper.furl").encode(get_filesystem_encoding()) |
---|
1034 | self.tub.registerReference(self.helper, furlFile=helper_furlfile) |
---|
1035 | |
---|
1036 | def _get_tempdir(self): |
---|
1037 | """ |
---|
1038 | Determine the path to the directory where temporary files for this node |
---|
1039 | should be written. |
---|
1040 | |
---|
1041 | :return bytes: The path which will exist and be a directory. |
---|
1042 | """ |
---|
1043 | tempdir_config = self.config.get_config("node", "tempdir", "tmp") |
---|
1044 | if isinstance(tempdir_config, bytes): |
---|
1045 | tempdir_config = tempdir_config.decode('utf-8') |
---|
1046 | tempdir = self.config.get_config_path(tempdir_config) |
---|
1047 | if not os.path.exists(tempdir): |
---|
1048 | fileutil.make_dirs(tempdir) |
---|
1049 | return tempdir |
---|
1050 | |
---|
1051 | def init_web(self, webport): |
---|
1052 | self.log("init_web(webport=%s)", args=(webport,)) |
---|
1053 | |
---|
1054 | from allmydata.webish import WebishServer, anonymous_tempfile_factory |
---|
1055 | nodeurl_path = self.config.get_config_path("node.url") |
---|
1056 | staticdir_config = self.config.get_config("node", "web.static", "public_html") |
---|
1057 | staticdir = self.config.get_config_path(staticdir_config) |
---|
1058 | ws = WebishServer( |
---|
1059 | self, |
---|
1060 | webport, |
---|
1061 | anonymous_tempfile_factory(self._get_tempdir()), |
---|
1062 | nodeurl_path, |
---|
1063 | staticdir, |
---|
1064 | ) |
---|
1065 | ws.setServiceParent(self) |
---|
1066 | |
---|
1067 | def init_sftp_server(self): |
---|
1068 | if self.config.get_config("sftpd", "enabled", False, boolean=True): |
---|
1069 | accountfile = self.config.get_config("sftpd", "accounts.file", None) |
---|
1070 | if accountfile: |
---|
1071 | accountfile = self.config.get_config_path(accountfile) |
---|
1072 | sftp_portstr = self.config.get_config("sftpd", "port", "tcp:8022") |
---|
1073 | pubkey_file = self.config.get_config("sftpd", "host_pubkey_file") |
---|
1074 | privkey_file = self.config.get_config("sftpd", "host_privkey_file") |
---|
1075 | |
---|
1076 | from allmydata.frontends import sftpd |
---|
1077 | s = sftpd.SFTPServer(self, accountfile, |
---|
1078 | sftp_portstr, pubkey_file, privkey_file) |
---|
1079 | s.setServiceParent(self) |
---|
1080 | |
---|
1081 | def _check_exit_trigger(self, exit_trigger_file): |
---|
1082 | if os.path.exists(exit_trigger_file): |
---|
1083 | mtime = os.stat(exit_trigger_file)[stat.ST_MTIME] |
---|
1084 | if mtime > time.time() - 120.0: |
---|
1085 | return |
---|
1086 | else: |
---|
1087 | self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,)) |
---|
1088 | else: |
---|
1089 | self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,)) |
---|
1090 | reactor.stop() |
---|
1091 | |
---|
1092 | def get_encoding_parameters(self): |
---|
1093 | return self.encoding_params |
---|
1094 | |
---|
1095 | def introducer_connection_statuses(self): |
---|
1096 | return [ic.connection_status() for ic in self.introducer_clients] |
---|
1097 | |
---|
1098 | def connected_to_introducer(self): |
---|
1099 | return any([ic.connected_to_introducer() for ic in self.introducer_clients]) |
---|
1100 | |
---|
1101 | def get_renewal_secret(self): # this will go away |
---|
1102 | return self._secret_holder.get_renewal_secret() |
---|
1103 | |
---|
1104 | def get_cancel_secret(self): |
---|
1105 | return self._secret_holder.get_cancel_secret() |
---|
1106 | |
---|
1107 | def debug_wait_for_client_connections(self, num_clients): |
---|
1108 | """Return a Deferred that fires (with None) when we have connections |
---|
1109 | to the given number of peers. Useful for tests that set up a |
---|
1110 | temporary test network and need to know when it is safe to proceed |
---|
1111 | with an upload or download.""" |
---|
1112 | def _check(): |
---|
1113 | return len(self.storage_broker.get_connected_servers()) >= num_clients |
---|
1114 | d = self.poll(_check, 0.5) |
---|
1115 | d.addCallback(lambda res: None) |
---|
1116 | return d |
---|
1117 | |
---|
1118 | |
---|
1119 | # these four methods are the primitives for creating filenodes and |
---|
1120 | # dirnodes. The first takes a URI and produces a filenode or (new-style) |
---|
1121 | # dirnode. The other three create brand-new filenodes/dirnodes. |
---|
1122 | |
---|
1123 | def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"): |
---|
1124 | # This returns synchronously. |
---|
1125 | # Note that it does *not* validate the write_uri and read_uri; instead we |
---|
1126 | # may get an opaque node if there were any problems. |
---|
1127 | return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name) |
---|
1128 | |
---|
1129 | def create_dirnode( |
---|
1130 | self, |
---|
1131 | initial_children: dict | None = None, |
---|
1132 | version: int | None = None, |
---|
1133 | *, |
---|
1134 | unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None |
---|
1135 | ) -> DirectoryNode: |
---|
1136 | """ |
---|
1137 | Create a new directory. |
---|
1138 | |
---|
1139 | :param initial_children: If given, a structured dict representing the |
---|
1140 | initial content of the created directory. See |
---|
1141 | `docs/frontends/webapi.rst` for examples. |
---|
1142 | |
---|
1143 | :param version: If given, an int representing the mutable file format |
---|
1144 | of the new object. Acceptable values are currently `SDMF_VERSION` |
---|
1145 | or `MDMF_VERSION` (corresponding to 0 or 1, respectively, as |
---|
1146 | defined in `allmydata.interfaces`). If no such value is provided, |
---|
1147 | the default mutable format will be used (currently SDMF). |
---|
1148 | |
---|
1149 | :param unique_keypair: an optional tuple containing the RSA public |
---|
1150 | and private key to be used for the new directory. Typically, this |
---|
1151 | value is omitted (in which case a new random keypair will be |
---|
1152 | generated at creation time). |
---|
1153 | |
---|
1154 | **Warning** This value independently determines the identity of |
---|
1155 | the mutable object to create. There cannot be two different |
---|
1156 | mutable objects that share a keypair. They will merge into one |
---|
1157 | object (with undefined contents). |
---|
1158 | |
---|
1159 | :return: A Deferred which will fire with a representation of the new |
---|
1160 | directory after it has been created. |
---|
1161 | """ |
---|
1162 | d = self.nodemaker.create_new_mutable_directory( |
---|
1163 | initial_children, |
---|
1164 | version=version, |
---|
1165 | keypair=unique_keypair, |
---|
1166 | ) |
---|
1167 | return d |
---|
1168 | |
---|
1169 | def create_immutable_dirnode(self, children, convergence=None): |
---|
1170 | return self.nodemaker.create_immutable_directory(children, convergence) |
---|
1171 | |
---|
1172 | def create_mutable_file( |
---|
1173 | self, |
---|
1174 | contents: bytes | None = None, |
---|
1175 | version: int | None = None, |
---|
1176 | *, |
---|
1177 | unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None, |
---|
1178 | ) -> MutableFileNode: |
---|
1179 | """ |
---|
1180 | Create *and upload* a new mutable object. |
---|
1181 | |
---|
1182 | :param contents: If given, the initial contents for the new object. |
---|
1183 | |
---|
1184 | :param version: If given, the mutable file format for the new object |
---|
1185 | (otherwise a format will be chosen automatically). |
---|
1186 | |
---|
1187 | :param unique_keypair: **Warning** This value independently determines |
---|
1188 | the identity of the mutable object to create. There cannot be two |
---|
1189 | different mutable objects that share a keypair. They will merge |
---|
1190 | into one object (with undefined contents). |
---|
1191 | |
---|
1192 | It is common to pass a None value (or not pass a valuye) for this |
---|
1193 | parameter. In these cases, a new random keypair will be |
---|
1194 | generated. |
---|
1195 | |
---|
1196 | If non-None, the given public/private keypair will be used for the |
---|
1197 | new object. The expected use-case is for implementing compliance |
---|
1198 | tests. |
---|
1199 | |
---|
1200 | :return: A Deferred which will fire with a representation of the new |
---|
1201 | mutable object after it has been uploaded. |
---|
1202 | """ |
---|
1203 | return self.nodemaker.create_mutable_file(contents, |
---|
1204 | version=version, |
---|
1205 | keypair=unique_keypair) |
---|
1206 | |
---|
1207 | def upload(self, uploadable, reactor=None): |
---|
1208 | uploader = self.getServiceNamed("uploader") |
---|
1209 | return uploader.upload(uploadable, reactor=reactor) |
---|