1 | # coding: utf-8 |
---|
2 | |
---|
3 | """ |
---|
4 | Ported to Python 3. |
---|
5 | """ |
---|
6 | |
---|
7 | from typing import Union, Optional |
---|
8 | |
---|
9 | import os, sys, textwrap |
---|
10 | import codecs |
---|
11 | from os.path import join |
---|
12 | import urllib.parse |
---|
13 | |
---|
14 | from yaml import ( |
---|
15 | safe_dump, |
---|
16 | ) |
---|
17 | |
---|
18 | from twisted.python import usage |
---|
19 | |
---|
20 | from allmydata.util.assertutil import precondition |
---|
21 | from allmydata.util.encodingutil import quote_output, \ |
---|
22 | quote_local_unicode_path, argv_to_abspath |
---|
23 | from allmydata.scripts.default_nodedir import _default_nodedir |
---|
24 | from .types_ import Parameters |
---|
25 | |
---|
26 | |
---|
27 | def get_default_nodedir(): |
---|
28 | return _default_nodedir |
---|
29 | |
---|
30 | def wrap_paragraphs(text, width): |
---|
31 | # like textwrap.wrap(), but preserve paragraphs (delimited by double |
---|
32 | # newlines) and leading whitespace, and remove internal whitespace. |
---|
33 | text = textwrap.dedent(text) |
---|
34 | if text.startswith("\n"): |
---|
35 | text = text[1:] |
---|
36 | return "\n\n".join([textwrap.fill(paragraph, width=width) |
---|
37 | for paragraph in text.split("\n\n")]) |
---|
38 | |
---|
39 | class BaseOptions(usage.Options): |
---|
40 | def __init__(self): |
---|
41 | super(BaseOptions, self).__init__() |
---|
42 | self.command_name = os.path.basename(sys.argv[0]) |
---|
43 | |
---|
44 | # Only allow "tahoe --version", not e.g. "tahoe <cmd> --version" |
---|
45 | def opt_version(self): |
---|
46 | raise usage.UsageError("--version not allowed on subcommands") |
---|
47 | |
---|
48 | description : Optional[str] = None |
---|
49 | description_unwrapped = None # type: Optional[str] |
---|
50 | |
---|
51 | def __str__(self): |
---|
52 | width = int(os.environ.get('COLUMNS', '80')) |
---|
53 | s = (self.getSynopsis() + '\n' + |
---|
54 | "(use 'tahoe --help' to view global options)\n" + |
---|
55 | '\n' + |
---|
56 | self.getUsage()) |
---|
57 | if self.description: |
---|
58 | s += '\n' + wrap_paragraphs(self.description, width) + '\n' |
---|
59 | if self.description_unwrapped: |
---|
60 | du = textwrap.dedent(self.description_unwrapped) |
---|
61 | if du.startswith("\n"): |
---|
62 | du = du[1:] |
---|
63 | s += '\n' + du + '\n' |
---|
64 | return s |
---|
65 | |
---|
66 | class BasedirOptions(BaseOptions): |
---|
67 | default_nodedir = _default_nodedir |
---|
68 | |
---|
69 | optParameters : Parameters = [ |
---|
70 | ["basedir", "C", None, "Specify which Tahoe base directory should be used. [default: %s]" |
---|
71 | % quote_local_unicode_path(_default_nodedir)], |
---|
72 | ] |
---|
73 | |
---|
74 | def parseArgs(self, basedir=None): |
---|
75 | # This finds the node-directory option correctly even if we are in a subcommand. |
---|
76 | root = self.parent |
---|
77 | while root.parent is not None: |
---|
78 | root = root.parent |
---|
79 | |
---|
80 | if root['node-directory'] and self['basedir']: |
---|
81 | raise usage.UsageError("The --node-directory (or -d) and --basedir (or -C) options cannot both be used.") |
---|
82 | if root['node-directory'] and basedir: |
---|
83 | raise usage.UsageError("The --node-directory (or -d) option and a basedir argument cannot both be used.") |
---|
84 | if self['basedir'] and basedir: |
---|
85 | raise usage.UsageError("The --basedir (or -C) option and a basedir argument cannot both be used.") |
---|
86 | |
---|
87 | if basedir: |
---|
88 | b = argv_to_abspath(basedir) |
---|
89 | elif self['basedir']: |
---|
90 | b = argv_to_abspath(self['basedir']) |
---|
91 | elif root['node-directory']: |
---|
92 | b = argv_to_abspath(root['node-directory']) |
---|
93 | elif self.default_nodedir: |
---|
94 | b = self.default_nodedir |
---|
95 | else: |
---|
96 | raise usage.UsageError("No default basedir available, you must provide one with --node-directory, --basedir, or a basedir argument") |
---|
97 | self['basedir'] = b |
---|
98 | self['node-directory'] = b |
---|
99 | |
---|
100 | def postOptions(self): |
---|
101 | if not self['basedir']: |
---|
102 | raise usage.UsageError("A base directory for the node must be provided.") |
---|
103 | |
---|
104 | class NoDefaultBasedirOptions(BasedirOptions): |
---|
105 | default_nodedir = None |
---|
106 | |
---|
107 | optParameters = [ |
---|
108 | ["basedir", "C", None, "Specify which Tahoe base directory should be used."], |
---|
109 | ] # type: Parameters |
---|
110 | |
---|
111 | # This is overridden in order to ensure we get a "Wrong number of arguments." |
---|
112 | # error when more than one argument is given. |
---|
113 | def parseArgs(self, basedir=None): |
---|
114 | BasedirOptions.parseArgs(self, basedir) |
---|
115 | |
---|
116 | def getSynopsis(self): |
---|
117 | return "Usage: %s [global-options] %s [options] NODEDIR" % (self.command_name, self.subcommand_name) |
---|
118 | |
---|
119 | |
---|
120 | DEFAULT_ALIAS = u"tahoe" |
---|
121 | |
---|
122 | |
---|
123 | def write_introducer(basedir, petname, furl): |
---|
124 | """ |
---|
125 | Overwrite the node's ``introducers.yaml`` with a file containing the given |
---|
126 | introducer information. |
---|
127 | """ |
---|
128 | if isinstance(furl, bytes): |
---|
129 | furl = furl.decode("utf-8") |
---|
130 | private = basedir.child(b"private") |
---|
131 | private.makedirs(ignoreExistingDirectory=True) |
---|
132 | private.child(b"introducers.yaml").setContent( |
---|
133 | safe_dump({ |
---|
134 | "introducers": { |
---|
135 | petname: { |
---|
136 | "furl": furl, |
---|
137 | }, |
---|
138 | }, |
---|
139 | }).encode("ascii"), |
---|
140 | ) |
---|
141 | |
---|
142 | |
---|
143 | def get_introducer_furl(nodedir, config): |
---|
144 | """ |
---|
145 | :return: the introducer FURL for the given node (no matter if it's |
---|
146 | a client-type node or an introducer itself) |
---|
147 | """ |
---|
148 | for petname, (furl, cache) in config.get_introducer_configuration().items(): |
---|
149 | return furl |
---|
150 | |
---|
151 | # We have no configured introducers. Maybe this is running *on* the |
---|
152 | # introducer? Let's guess, sure why not. |
---|
153 | try: |
---|
154 | with open(join(nodedir, "private", "introducer.furl"), "r") as f: |
---|
155 | return f.read().strip() |
---|
156 | except IOError: |
---|
157 | raise Exception( |
---|
158 | "Can't find introducer FURL in tahoe.cfg nor " |
---|
159 | "{}/private/introducer.furl".format(nodedir) |
---|
160 | ) |
---|
161 | |
---|
162 | |
---|
163 | def get_aliases(nodedir): |
---|
164 | aliases = {} |
---|
165 | aliasfile = os.path.join(nodedir, "private", "aliases") |
---|
166 | rootfile = os.path.join(nodedir, "private", "root_dir.cap") |
---|
167 | try: |
---|
168 | with open(rootfile, "r") as f: |
---|
169 | rootcap = f.read().strip() |
---|
170 | if rootcap: |
---|
171 | aliases[DEFAULT_ALIAS] = rootcap |
---|
172 | except EnvironmentError: |
---|
173 | pass |
---|
174 | try: |
---|
175 | with codecs.open(aliasfile, "r", "utf-8") as f: |
---|
176 | for line in f: |
---|
177 | line = line.strip() |
---|
178 | if line.startswith("#") or not line: |
---|
179 | continue |
---|
180 | name, cap = line.split(u":", 1) |
---|
181 | # normalize it: remove http: prefix, urldecode |
---|
182 | cap = cap.strip().encode('utf-8') |
---|
183 | aliases[name] = cap |
---|
184 | except EnvironmentError: |
---|
185 | pass |
---|
186 | return aliases |
---|
187 | |
---|
188 | class DefaultAliasMarker(object): |
---|
189 | pass |
---|
190 | |
---|
191 | pretend_platform_uses_lettercolon = False # for tests |
---|
192 | def platform_uses_lettercolon_drivename(): |
---|
193 | if ("win32" in sys.platform.lower() |
---|
194 | or "cygwin" in sys.platform.lower() |
---|
195 | or pretend_platform_uses_lettercolon): |
---|
196 | return True |
---|
197 | return False |
---|
198 | |
---|
199 | |
---|
200 | class TahoeError(Exception): |
---|
201 | def __init__(self, msg): |
---|
202 | Exception.__init__(self, msg) |
---|
203 | self.msg = msg |
---|
204 | |
---|
205 | def display(self, err): |
---|
206 | print(self.msg, file=err) |
---|
207 | |
---|
208 | |
---|
209 | class UnknownAliasError(TahoeError): |
---|
210 | def __init__(self, msg): |
---|
211 | TahoeError.__init__(self, "error: " + msg) |
---|
212 | |
---|
213 | |
---|
214 | def get_alias(aliases, path_unicode, default): |
---|
215 | """ |
---|
216 | Transform u"work:path/filename" into (aliases[u"work"], u"path/filename".encode('utf-8')). |
---|
217 | If default=None, then an empty alias is indicated by returning |
---|
218 | DefaultAliasMarker. We special-case strings with a recognized cap URI |
---|
219 | prefix, to make it easy to access specific files/directories by their |
---|
220 | caps. |
---|
221 | If the transformed alias is either not found in aliases, or is blank |
---|
222 | and default is not found in aliases, an UnknownAliasError is |
---|
223 | raised. |
---|
224 | """ |
---|
225 | precondition(isinstance(path_unicode, str), path_unicode) |
---|
226 | |
---|
227 | from allmydata import uri |
---|
228 | path = path_unicode.encode('utf-8').strip(b" ") |
---|
229 | if uri.has_uri_prefix(path): |
---|
230 | # We used to require "URI:blah:./foo" in order to get a subpath, |
---|
231 | # stripping out the ":./" sequence. We still allow that for compatibility, |
---|
232 | # but now also allow just "URI:blah/foo". |
---|
233 | sep = path.find(b":./") |
---|
234 | if sep != -1: |
---|
235 | return path[:sep], path[sep+3:] |
---|
236 | sep = path.find(b"/") |
---|
237 | if sep != -1: |
---|
238 | return path[:sep], path[sep+1:] |
---|
239 | return path, b"" |
---|
240 | colon = path.find(b":") |
---|
241 | if colon == -1: |
---|
242 | # no alias |
---|
243 | if default == None: |
---|
244 | return DefaultAliasMarker, path |
---|
245 | if default not in aliases: |
---|
246 | raise UnknownAliasError("No alias specified, and the default %s alias doesn't exist. " |
---|
247 | "To create it, use 'tahoe create-alias %s'." |
---|
248 | % (quote_output(default), quote_output(default, quotemarks=False))) |
---|
249 | return uri.from_string_dirnode(aliases[default]).to_string(), path |
---|
250 | if colon == 1 and default is None and platform_uses_lettercolon_drivename(): |
---|
251 | # treat C:\why\must\windows\be\so\weird as a local path, not a tahoe |
---|
252 | # file in the "C:" alias |
---|
253 | return DefaultAliasMarker, path |
---|
254 | |
---|
255 | # decoding must succeed because path is valid UTF-8 and colon & space are ASCII |
---|
256 | alias = path[:colon].decode('utf-8') |
---|
257 | if u"/" in alias: |
---|
258 | # no alias, but there's a colon in a dirname/filename, like |
---|
259 | # "foo/bar:7" |
---|
260 | if default == None: |
---|
261 | return DefaultAliasMarker, path |
---|
262 | if default not in aliases: |
---|
263 | raise UnknownAliasError("No alias specified, and the default %s alias doesn't exist. " |
---|
264 | "To create it, use 'tahoe create-alias %s'." |
---|
265 | % (quote_output(default), quote_output(default, quotemarks=False))) |
---|
266 | return uri.from_string_dirnode(aliases[default]).to_string(), path |
---|
267 | if alias not in aliases: |
---|
268 | raise UnknownAliasError("Unknown alias %s, please create it with 'tahoe add-alias' or 'tahoe create-alias'." % |
---|
269 | quote_output(alias)) |
---|
270 | return uri.from_string_dirnode(aliases[alias]).to_string(), path[colon+1:] |
---|
271 | |
---|
272 | def escape_path(path: Union[str, bytes]) -> str: |
---|
273 | """ |
---|
274 | Return path quoted to US-ASCII, valid URL characters. |
---|
275 | |
---|
276 | >>> path = u'/føö/bar/☃' |
---|
277 | >>> escaped = escape_path(path) |
---|
278 | >>> escaped |
---|
279 | u'/f%C3%B8%C3%B6/bar/%E2%98%83' |
---|
280 | """ |
---|
281 | if isinstance(path, str): |
---|
282 | path = path.encode("utf-8") |
---|
283 | segments = path.split(b"/") |
---|
284 | result = str( |
---|
285 | b"/".join([ |
---|
286 | urllib.parse.quote(s).encode("ascii") for s in segments |
---|
287 | ]), |
---|
288 | "ascii" |
---|
289 | ) |
---|
290 | return result |
---|