source: trunk/src/allmydata/frontends/sftpd.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 90.2 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import six
6import heapq, traceback, stat, struct
7from stat import S_IFREG, S_IFDIR
8from time import time, strftime, localtime
9
10from zope.interface import implementer
11from twisted.python import components
12from twisted.application import service, strports
13from twisted.conch.ssh import factory, keys, session
14from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
15     FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
16     FX_BAD_MESSAGE, FX_FAILURE, FX_OK
17from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
18     FXF_CREAT, FXF_TRUNC, FXF_EXCL
19from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
20from twisted.conch.avatar import ConchUser
21from twisted.conch.openssh_compat import primes
22from twisted.cred import portal
23from twisted.internet.error import ProcessDone, ProcessTerminated
24from twisted.python.failure import Failure
25from twisted.internet.interfaces import ITransport
26
27from twisted.internet import defer
28from twisted.internet.interfaces import IConsumer
29from foolscap.api import eventually
30from allmydata.util import deferredutil
31
32from allmydata.util.assertutil import _assert, precondition
33from allmydata.util.consumer import download_to_data
34from allmydata.util.encodingutil import get_filesystem_encoding
35from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
36     NoSuchChildError, ChildOfWrongTypeError
37from allmydata.mutable.common import NotWriteableError
38from allmydata.mutable.publish import MutableFileHandle
39from allmydata.immutable.upload import FileHandle
40from allmydata.dirnode import update_metadata
41from allmydata.util.fileutil import EncryptedTemporaryFile
42
43noisy = True
44
45from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
46    msg as logmsg, PrefixingLogMixin
47
48
49def createSFTPError(errorCode, errorMessage):
50    """
51    SFTPError that can accept both Unicode and bytes.
52
53    Twisted expects _native_ strings for the SFTPError message, but we often do
54    Unicode by default even on Python 2.
55    """
56    return SFTPError(errorCode, six.ensure_str(errorMessage))
57
58
59def eventually_callback(d):
60    return lambda res: eventually(d.callback, res)
61
62def eventually_errback(d):
63    return lambda err: eventually(d.errback, err)
64
65
66def _utf8(x):
67    if isinstance(x, str):
68        return x.encode('utf-8')
69    if isinstance(x, bytes):
70        return x
71    return repr(x)
72
73
74def _to_sftp_time(t):
75    """SFTP times are unsigned 32-bit integers representing UTC seconds
76    (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
77    A Tahoe time is the corresponding float."""
78    return int(t) & int(0xFFFFFFFF)
79
80
81def _convert_error(res, request):
82    """If res is not a Failure, return it, otherwise reraise the appropriate
83    SFTPError."""
84
85    if not isinstance(res, Failure):
86        logged_res = res
87        if isinstance(res, (bytes, str)): logged_res = "<data of length %r>" % (len(res),)
88        logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
89        return res
90
91    err = res
92    logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
93    try:
94        if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
95    except Exception:  # pragma: no cover
96        pass
97
98    # The message argument to SFTPError must not reveal information that
99    # might compromise anonymity, if we are running over an anonymous network.
100
101    if err.check(SFTPError):
102        # original raiser of SFTPError has responsibility to ensure anonymity
103        raise err
104    if err.check(NoSuchChildError):
105        childname = _utf8(err.value.args[0])
106        raise createSFTPError(FX_NO_SUCH_FILE, childname)
107    if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
108        msg = _utf8(err.value.args[0])
109        raise createSFTPError(FX_PERMISSION_DENIED, msg)
110    if err.check(ExistingChildError):
111        # Versions of SFTP after v3 (which is what twisted.conch implements)
112        # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
113        # However v3 doesn't; instead, other servers such as sshd return
114        # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
115        # to translate the error to the equivalent of POSIX EEXIST, which is
116        # necessary for some picky programs (such as gedit).
117        msg = _utf8(err.value.args[0])
118        raise createSFTPError(FX_FAILURE, msg)
119    if err.check(NotImplementedError):
120        raise createSFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
121    if err.check(EOFError):
122        raise createSFTPError(FX_EOF, "end of file reached")
123    if err.check(defer.FirstError):
124        _convert_error(err.value.subFailure, request)
125
126    # We assume that the error message is not anonymity-sensitive.
127    raise createSFTPError(FX_FAILURE, _utf8(err.value))
128
129
130def _repr_flags(flags):
131    return "|".join([f for f in
132                     [(flags & FXF_READ)   and "FXF_READ"   or None,
133                      (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
134                      (flags & FXF_APPEND) and "FXF_APPEND" or None,
135                      (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
136                      (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
137                      (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
138                     ]
139                     if f])
140
141
142def _lsLine(name, attrs):
143    st_uid = "tahoe"
144    st_gid = "tahoe"
145    st_mtime = attrs.get("mtime", 0)
146    st_mode = attrs["permissions"]
147
148    # Some clients won't tolerate '?' in the size field (#1337).
149    st_size = attrs.get("size", 0)
150
151    # We don't know how many links there really are to this object.
152    st_nlink = 1
153
154    # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
155    # We previously could not call the version in Twisted because we needed the change
156    # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
157    # Since we now depend on Twisted v10.1, consider calling Twisted's version.
158
159    mode = st_mode
160    perms = ["-"] * 10
161    ft = stat.S_IFMT(mode)
162    if   stat.S_ISDIR(ft): perms[0] = 'd'
163    elif stat.S_ISREG(ft): perms[0] = '-'
164    else: perms[0] = '?'
165    # user
166    if mode&stat.S_IRUSR: perms[1] = 'r'
167    if mode&stat.S_IWUSR: perms[2] = 'w'
168    if mode&stat.S_IXUSR: perms[3] = 'x'
169    # group
170    if mode&stat.S_IRGRP: perms[4] = 'r'
171    if mode&stat.S_IWGRP: perms[5] = 'w'
172    if mode&stat.S_IXGRP: perms[6] = 'x'
173    # other
174    if mode&stat.S_IROTH: perms[7] = 'r'
175    if mode&stat.S_IWOTH: perms[8] = 'w'
176    if mode&stat.S_IXOTH: perms[9] = 'x'
177    # suid/sgid never set
178
179    l = "".join(perms)
180    l += str(st_nlink).rjust(5) + ' '
181    un = str(st_uid)
182    l += un.ljust(9)
183    gr = str(st_gid)
184    l += gr.ljust(9)
185    sz = str(st_size)
186    l += sz.rjust(8)
187    l += ' '
188    day = 60 * 60 * 24
189    sixmo = day * 7 * 26
190    now = time()
191    if st_mtime + sixmo < now or st_mtime > now + day:
192        # mtime is more than 6 months ago, or more than one day in the future
193        l += strftime("%b %d  %Y ", localtime(st_mtime))
194    else:
195        l += strftime("%b %d %H:%M ", localtime(st_mtime))
196    l = l.encode("utf-8")
197    l += name
198    return l
199
200
201def _no_write(parent_readonly, child, metadata=None):
202    """Whether child should be listed as having read-only permissions in parent."""
203
204    if child.is_unknown():
205        return True
206    elif child.is_mutable():
207        return child.is_readonly()
208    elif parent_readonly or IDirectoryNode.providedBy(child):
209        return True
210    else:
211        return metadata is not None and metadata.get('no-write', False)
212
213
214def _populate_attrs(childnode, metadata, size=None):
215    attrs = {}
216
217    # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
218    # bits, otherwise the client may refuse to open a directory.
219    # Also, sshfs run as a non-root user requires files and directories
220    # to be world-readable/writeable.
221    # It is important that we never set the executable bits on files.
222    #
223    # Directories and unknown nodes have no size, and SFTP doesn't
224    # require us to make one up.
225    #
226    # childnode might be None, meaning that the file doesn't exist yet,
227    # but we're going to write it later.
228
229    if childnode and childnode.is_unknown():
230        perms = 0
231    elif childnode and IDirectoryNode.providedBy(childnode):
232        perms = S_IFDIR | 0o777
233    else:
234        # For files, omit the size if we don't immediately know it.
235        if childnode and size is None:
236            size = childnode.get_size()
237        if size is not None:
238            _assert(isinstance(size, int) and not isinstance(size, bool), size=size)
239            attrs['size'] = size
240        perms = S_IFREG | 0o666
241
242    if metadata:
243        if metadata.get('no-write', False):
244            perms &= S_IFDIR | S_IFREG | 0o555  # clear 'w' bits
245
246        # See webapi.txt for what these times mean.
247        # We would prefer to omit atime, but SFTP version 3 can only
248        # accept mtime if atime is also set.
249        if 'linkmotime' in metadata.get('tahoe', {}):
250            attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
251        elif 'mtime' in metadata:
252            attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
253
254        if 'linkcrtime' in metadata.get('tahoe', {}):
255            attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
256
257    attrs['permissions'] = perms
258
259    # twisted.conch.ssh.filetransfer only implements SFTP version 3,
260    # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
261
262    return attrs
263
264
265def _attrs_to_metadata(attrs):
266    metadata = {}
267
268    for key in attrs:
269        if key == "mtime" or key == "ctime" or key == "createtime":
270            metadata[key] = int(attrs[key])
271        elif key.startswith("ext_"):
272            metadata[key] = str(attrs[key])
273
274    perms = attrs.get('permissions', stat.S_IWUSR)
275    if not (perms & stat.S_IWUSR):
276        metadata['no-write'] = True
277
278    return metadata
279
280
281def _direntry_for(filenode_or_parent, childname, filenode=None):
282    precondition(isinstance(childname, (str, type(None))), childname=childname)
283
284    if childname is None:
285        filenode_or_parent = filenode
286
287    if filenode_or_parent:
288        rw_uri = filenode_or_parent.get_write_uri()
289        if rw_uri and childname:
290            return rw_uri + b"/" + childname.encode('utf-8')
291        else:
292            return rw_uri
293
294    return None
295
296
297@implementer(IConsumer)
298class OverwriteableFileConsumer(PrefixingLogMixin):
299    """I act both as a consumer for the download of the original file contents, and as a
300    wrapper for a temporary file that records the downloaded data and any overwrites.
301    I use a priority queue to keep track of which regions of the file have been overwritten
302    but not yet downloaded, so that the download does not clobber overwritten data.
303    I use another priority queue to record milestones at which to make callbacks
304    indicating that a given number of bytes have been downloaded.
305
306    The temporary file reflects the contents of the file that I represent, except that:
307     - regions that have neither been downloaded nor overwritten, if present,
308       contain garbage.
309     - the temporary file may be shorter than the represented file (it is never longer).
310       The latter's current size is stored in self.current_size.
311
312    This abstraction is mostly independent of SFTP. Consider moving it, if it is found
313    useful for other frontends."""
314
315    def __init__(self, download_size, tempfile_maker):
316        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
317        if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
318        self.download_size = download_size
319        self.current_size = download_size
320        self.f = tempfile_maker()
321        self.downloaded = 0
322        self.milestones = []  # empty heap of (offset, d)
323        self.overwrites = []  # empty heap of (start, end)
324        self.is_closed = False
325
326        self.done = defer.Deferred()
327        self.done_status = None  # None -> not complete, Failure -> download failed, str -> download succeeded
328        self.producer = None
329
330    def get_file(self):
331        return self.f
332
333    def get_current_size(self):
334        return self.current_size
335
336    def set_current_size(self, size):
337        if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
338                           (size, self.current_size, self.downloaded), level=NOISY)
339        if size < self.current_size or size < self.downloaded:
340            self.f.truncate(size)
341        if size > self.current_size:
342            self.overwrite(self.current_size, b"\x00" * (size - self.current_size))
343        self.current_size = size
344
345        # make the invariant self.download_size <= self.current_size be true again
346        if size < self.download_size:
347            self.download_size = size
348
349        if self.downloaded >= self.download_size:
350            self.download_done(b"size changed")
351
352    def registerProducer(self, p, streaming):
353        if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
354        if self.producer is not None:
355            raise RuntimeError("producer is already registered")
356
357        self.producer = p
358        if streaming:
359            # call resumeProducing once to start things off
360            p.resumeProducing()
361        else:
362            def _iterate():
363                if self.done_status is None:
364                    p.resumeProducing()
365                    eventually(_iterate)
366            _iterate()
367
368    def write(self, data):
369        if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
370        if self.is_closed:
371            return
372
373        if self.downloaded >= self.download_size:
374            return
375
376        next_downloaded = self.downloaded + len(data)
377        if next_downloaded > self.download_size:
378            data = data[:(self.download_size - self.downloaded)]
379
380        while len(self.overwrites) > 0:
381            (start, end) = self.overwrites[0]
382            if start >= next_downloaded:
383                # This and all remaining overwrites are after the data we just downloaded.
384                break
385            if start > self.downloaded:
386                # The data we just downloaded has been partially overwritten.
387                # Write the prefix of it that precedes the overwritten region.
388                self.f.seek(self.downloaded)
389                self.f.write(data[:(start - self.downloaded)])
390
391            # This merges consecutive overwrites if possible, which allows us to detect the
392            # case where the download can be stopped early because the remaining region
393            # to download has already been fully overwritten.
394            heapq.heappop(self.overwrites)
395            while len(self.overwrites) > 0:
396                (start1, end1) = self.overwrites[0]
397                if start1 > end:
398                    break
399                end = end1
400                heapq.heappop(self.overwrites)
401
402            if end >= next_downloaded:
403                # This overwrite extends past the downloaded data, so there is no
404                # more data to consider on this call.
405                heapq.heappush(self.overwrites, (next_downloaded, end))
406                self._update_downloaded(next_downloaded)
407                return
408            elif end >= self.downloaded:
409                data = data[(end - self.downloaded):]
410                self._update_downloaded(end)
411
412        self.f.seek(self.downloaded)
413        self.f.write(data)
414        self._update_downloaded(next_downloaded)
415
416    def _update_downloaded(self, new_downloaded):
417        self.downloaded = new_downloaded
418        milestone = new_downloaded
419        if len(self.overwrites) > 0:
420            (start, end) = self.overwrites[0]
421            if start <= new_downloaded and end > milestone:
422                milestone = end
423
424        while len(self.milestones) > 0:
425            (next_, d) = self.milestones[0]
426            if next_ > milestone:
427                return
428            if noisy: self.log("MILESTONE %r %r" % (next_, d), level=NOISY)
429            heapq.heappop(self.milestones)
430            eventually_callback(d)(b"reached")
431
432        if milestone >= self.download_size:
433            self.download_done(b"reached download size")
434
435    def overwrite(self, offset, data):
436        if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
437        if self.is_closed:
438            self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
439            raise createSFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
440
441        if offset > self.current_size:
442            # Normally writing at an offset beyond the current end-of-file
443            # would leave a hole that appears filled with zeroes. However, an
444            # EncryptedTemporaryFile doesn't behave like that (if there is a
445            # hole in the file on disk, the zeroes that are read back will be
446            # XORed with the keystream). So we must explicitly write zeroes in
447            # the gap between the current EOF and the offset.
448
449            self.f.seek(self.current_size)
450            self.f.write(b"\x00" * (offset - self.current_size))
451            start = self.current_size
452        else:
453            self.f.seek(offset)
454            start = offset
455
456        self.f.write(data)
457        end = offset + len(data)
458        self.current_size = max(self.current_size, end)
459        if end > self.downloaded:
460            heapq.heappush(self.overwrites, (start, end))
461
462    def read(self, offset, length):
463        """When the data has been read, callback the Deferred that we return with this data.
464        Otherwise errback the Deferred that we return.
465        The caller must perform no more overwrites until the Deferred has fired."""
466
467        if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
468        if self.is_closed:
469            self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
470            raise createSFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
471
472        # Note that the overwrite method is synchronous. When a write request is processed
473        # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
474        # be called and will update self.current_size if necessary before returning. Therefore,
475        # self.current_size will be up-to-date for a subsequent call to this read method, and
476        # so it is correct to do the check for a read past the end-of-file here.
477        if offset >= self.current_size:
478            def _eof(): raise EOFError("read past end of file")
479            return defer.execute(_eof)
480
481        if offset + length > self.current_size:
482            length = self.current_size - offset
483            if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
484
485        needed = min(offset + length, self.download_size)
486
487        # If we fail to reach the needed number of bytes, the read request will fail.
488        d = self.when_reached_or_failed(needed)
489        def _reached_in_read(res):
490            # It is not necessarily the case that self.downloaded >= needed, because
491            # the file might have been truncated (thus truncating the download) and
492            # then extended.
493
494            _assert(self.current_size >= offset + length,
495                    current_size=self.current_size, offset=offset, length=length)
496            if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
497            self.f.seek(offset)
498            return self.f.read(length)
499        d.addCallback(_reached_in_read)
500        return d
501
502    def when_reached_or_failed(self, index):
503        if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
504        def _reached(res):
505            if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
506            return res
507
508        if self.done_status is not None:
509            return defer.execute(_reached, self.done_status)
510        if index <= self.downloaded:  # already reached successfully
511            if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
512            return defer.succeed("already reached successfully")
513        d = defer.Deferred()
514        d.addCallback(_reached)
515        heapq.heappush(self.milestones, (index, d))
516        return d
517
518    def when_done(self):
519        d = defer.Deferred()
520        self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
521        return d
522
523    def download_done(self, res):
524        _assert(isinstance(res, (bytes, Failure)), res=res)
525        # Only the first call to download_done counts, but we log subsequent calls
526        # (multiple calls are normal).
527        if self.done_status is not None:
528            self.log("IGNORING extra call to download_done with result %r; previous result was %r"
529                     % (res, self.done_status), level=OPERATIONAL)
530            return
531
532        self.log("DONE with result %r" % (res,), level=OPERATIONAL)
533
534        # We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
535        # in case when_done() is never called. Instead we stash the failure in self.done_status,
536        # from where the callback added in when_done() can retrieve it.
537        self.done_status = res
538        eventually_callback(self.done)(None)
539
540        while len(self.milestones) > 0:
541            (next_, d) = self.milestones[0]
542            if noisy: self.log("MILESTONE FINISH %r %r %r" % (next_, d, res), level=NOISY)
543            heapq.heappop(self.milestones)
544            # The callback means that the milestone has been reached if
545            # it is ever going to be. Note that the file may have been
546            # truncated to before the milestone.
547            eventually_callback(d)(res)
548
549    def close(self):
550        if not self.is_closed:
551            self.is_closed = True
552            try:
553                self.f.close()
554            except Exception as e:
555                self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
556        self.download_done(b"closed")
557        return self.done_status
558
559    def unregisterProducer(self):
560        # This will happen just before our client calls download_done, which will tell
561        # us the outcome of the download; we don't know the outcome at this point.
562        self.producer = None
563        self.log("producer unregistered", level=NOISY)
564
565
566SIZE_THRESHOLD = 1000
567
568
569@implementer(ISFTPFile)
570class ShortReadOnlySFTPFile(PrefixingLogMixin):
571    """I represent a file handle to a particular file on an SFTP connection.
572    I am used only for short immutable files opened in read-only mode.
573    When I am created, the file contents start to be downloaded to memory.
574    self.async_ is used to delay read requests until the download has finished."""
575
576    def __init__(self, userpath, filenode, metadata):
577        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
578        if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
579
580        precondition(isinstance(userpath, bytes) and IFileNode.providedBy(filenode),
581                     userpath=userpath, filenode=filenode)
582        self.filenode = filenode
583        self.metadata = metadata
584        self.async_ = download_to_data(filenode)
585        self.closed = False
586
587    def readChunk(self, offset, length):
588        request = ".readChunk(%r, %r)" % (offset, length)
589        self.log(request, level=OPERATIONAL)
590
591        if self.closed:
592            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
593            return defer.execute(_closed)
594
595        d = defer.Deferred()
596        def _read(data):
597            if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
598
599            # "In response to this request, the server will read as many bytes as it
600            #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
601            #  message.  If an error occurs or EOF is encountered before reading any
602            #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
603            #  files, it is guaranteed that this will read the specified number of
604            #  bytes, or up to end of file."
605            #
606            # i.e. we respond with an EOF error iff offset is already at EOF.
607
608            if offset >= len(data):
609                eventually_errback(d)(Failure(createSFTPError(FX_EOF, "read at or past end of file")))
610            else:
611                eventually_callback(d)(data[offset:offset+length])  # truncated if offset+length > len(data)
612            return data
613        self.async_.addCallbacks(_read, eventually_errback(d))
614        d.addBoth(_convert_error, request)
615        return d
616
617    def writeChunk(self, offset, data):
618        self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
619
620        def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
621        return defer.execute(_denied)
622
623    def close(self):
624        self.log(".close()", level=OPERATIONAL)
625
626        self.closed = True
627        return defer.succeed(None)
628
629    def getAttrs(self):
630        request = ".getAttrs()"
631        self.log(request, level=OPERATIONAL)
632
633        if self.closed:
634            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
635            return defer.execute(_closed)
636
637        d = defer.execute(_populate_attrs, self.filenode, self.metadata)
638        d.addBoth(_convert_error, request)
639        return d
640
641    def setAttrs(self, attrs):
642        self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
643        def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
644        return defer.execute(_denied)
645
646
647@implementer(ISFTPFile)
648class GeneralSFTPFile(PrefixingLogMixin):
649    """I represent a file handle to a particular file on an SFTP connection.
650    I wrap an instance of OverwriteableFileConsumer, which is responsible for
651    storing the file contents. In order to allow write requests to be satisfied
652    immediately, there is effectively a FIFO queue between requests made to this
653    file handle, and requests to my OverwriteableFileConsumer. This queue is
654    implemented by the callback chain of self.async_.
655
656    When first constructed, I am in an 'unopened' state that causes most
657    operations to be delayed until 'open' is called."""
658
659    def __init__(self, userpath, flags, close_notify, convergence):
660        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
661        if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
662                           (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
663
664        precondition(isinstance(userpath, bytes), userpath=userpath)
665        self.userpath = userpath
666        self.flags = flags
667        self.close_notify = close_notify
668        self.convergence = convergence
669        self.async_ = defer.Deferred()
670        # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
671        self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
672        self.closed = False
673        self.abandoned = False
674        self.parent = None
675        self.childname = None
676        self.filenode = None
677        self.metadata = None
678
679        # self.consumer should only be relied on in callbacks for self.async_, since it might
680        # not be set before then.
681        self.consumer = None
682
683    def open(self, parent=None, childname=None, filenode=None, metadata=None):  # noqa: F811
684        self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
685                 (parent, childname, filenode, metadata), level=OPERATIONAL)
686
687        precondition(isinstance(childname, (str, type(None))), childname=childname)
688        precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
689        precondition(not self.closed, sftpfile=self)
690
691        # If the file has been renamed, the new (parent, childname) takes precedence.
692        if self.parent is None:
693            self.parent = parent
694        if self.childname is None:
695            self.childname = childname
696        self.filenode = filenode
697        self.metadata = metadata
698
699        tempfile_maker = EncryptedTemporaryFile
700
701        if (self.flags & FXF_TRUNC) or not filenode:
702            # We're either truncating or creating the file, so we don't need the old contents.
703            self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
704            self.consumer.download_done(b"download not needed")
705        else:
706            self.async_.addCallback(lambda ignored: filenode.get_best_readable_version())
707
708            def _read(version):
709                if noisy: self.log("_read", level=NOISY)
710                download_size = version.get_size()
711                _assert(download_size is not None)
712
713                self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
714
715                d = version.read(self.consumer, 0, None)
716                def _finished(res):
717                    if not isinstance(res, Failure):
718                        res = b"download finished"
719                    self.consumer.download_done(res)
720                d.addBoth(_finished)
721                # It is correct to drop d here.
722            self.async_.addCallback(_read)
723
724        eventually_callback(self.async_)(None)
725
726        if noisy: self.log("open done", level=NOISY)
727        return self
728
729    def get_userpath(self):
730        return self.userpath
731
732    def get_direntry(self):
733        return _direntry_for(self.parent, self.childname)
734
735    def rename(self, new_userpath, new_parent, new_childname):
736        self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
737
738        precondition(isinstance(new_userpath, bytes) and isinstance(new_childname, str),
739                     new_userpath=new_userpath, new_childname=new_childname)
740        self.userpath = new_userpath
741        self.parent = new_parent
742        self.childname = new_childname
743
744    def abandon(self):
745        self.log(".abandon()", level=OPERATIONAL)
746
747        self.abandoned = True
748
749    def sync(self, ign=None):
750        # The ign argument allows some_file.sync to be used as a callback.
751        self.log(".sync()", level=OPERATIONAL)
752
753        d = defer.Deferred()
754        self.async_.addBoth(eventually_callback(d))
755        def _done(res):
756            if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
757            return res
758        d.addBoth(_done)
759        return d
760
761    def readChunk(self, offset, length):
762        request = ".readChunk(%r, %r)" % (offset, length)
763        self.log(request, level=OPERATIONAL)
764
765        if not (self.flags & FXF_READ):
766            def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
767            return defer.execute(_denied)
768
769        if self.closed:
770            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
771            return defer.execute(_closed)
772
773        d = defer.Deferred()
774        def _read(ign):
775            if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
776            d2 = self.consumer.read(offset, length)
777            d2.addBoth(eventually_callback(d))
778            # It is correct to drop d2 here.
779            return None
780        self.async_.addCallbacks(_read, eventually_errback(d))
781        d.addBoth(_convert_error, request)
782        return d
783
784    def writeChunk(self, offset, data):
785        self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
786
787        if not (self.flags & FXF_WRITE):
788            def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
789            return defer.execute(_denied)
790
791        if self.closed:
792            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
793            return defer.execute(_closed)
794
795        self.has_changed = True
796
797        # Note that we return without waiting for the write to occur. Reads and
798        # close wait for prior writes, and will fail if any prior operation failed.
799        # This is ok because SFTP makes no guarantee that the write completes
800        # before the request does. In fact it explicitly allows write errors to be
801        # delayed until close:
802        #   "One should note that on some server platforms even a close can fail.
803        #    This can happen e.g. if the server operating system caches writes,
804        #    and an error occurs while flushing cached writes during the close."
805
806        def _write(ign):
807            if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
808                               (offset, len(data), self.consumer.get_current_size()), level=NOISY)
809            # FXF_APPEND means that we should always write at the current end of file.
810            write_offset = offset
811            if self.flags & FXF_APPEND:
812                write_offset = self.consumer.get_current_size()
813
814            self.consumer.overwrite(write_offset, data)
815            if noisy: self.log("overwrite done", level=NOISY)
816            return None
817        self.async_.addCallback(_write)
818        # don't addErrback to self.async_, just allow subsequent async ops to fail.
819        return defer.succeed(None)
820
821    def _do_close(self, res, d=None):
822        if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
823        status = None
824        if self.consumer:
825            status = self.consumer.close()
826
827        # We must close_notify before re-firing self.async_.
828        if self.close_notify:
829            self.close_notify(self.userpath, self.parent, self.childname, self)
830
831        if not isinstance(res, Failure) and isinstance(status, Failure):
832            res = status
833
834        if d:
835            eventually_callback(d)(res)
836        elif isinstance(res, Failure):
837            self.log("suppressing %r" % (res,), level=OPERATIONAL)
838
839    def close(self):
840        request = ".close()"
841        self.log(request, level=OPERATIONAL)
842
843        if self.closed:
844            return defer.succeed(None)
845
846        # This means that close has been called, not that the close has succeeded.
847        self.closed = True
848
849        if not (self.flags & (FXF_WRITE | FXF_CREAT)):
850            # We never fail a close of a handle opened only for reading, even if the file
851            # failed to download. (We could not do so deterministically, because it would
852            # depend on whether we reached the point of failure before abandoning the
853            # download.) Any reads that depended on file content that could not be downloaded
854            # will have failed. It is important that we don't close the consumer until
855            # previous read operations have completed.
856            self.async_.addBoth(self._do_close)
857            return defer.succeed(None)
858
859        # We must capture the abandoned, parent, and childname variables synchronously
860        # at the close call. This is needed by the correctness arguments in the comments
861        # for _abandon_any_heisenfiles and _rename_heisenfiles.
862        # Note that the file must have been opened before it can be closed.
863        abandoned = self.abandoned
864        parent = self.parent
865        childname = self.childname
866
867        # has_changed is set when writeChunk is called, not when the write occurs, so
868        # it is correct to optimize out the commit if it is False at the close call.
869        has_changed = self.has_changed
870
871        def _commit(ign):
872            d2 = self.consumer.when_done()
873            if self.filenode and self.filenode.is_mutable():
874                self.log("update mutable file %r childname=%r metadata=%r"
875                         % (self.filenode, childname, self.metadata), level=OPERATIONAL)
876                if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
877                    _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
878                    d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
879
880                d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
881            else:
882                def _add_file(ign):
883                    self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
884                    u = FileHandle(self.consumer.get_file(), self.convergence)
885                    return parent.add_file(childname, u, metadata=self.metadata)
886                d2.addCallback(_add_file)
887            return d2
888
889        # If the file has been abandoned, we don't want the close operation to get "stuck",
890        # even if self.async_ fails to re-fire. Completing the close independently of self.async_
891        # in that case should ensure that dropping an ssh connection is sufficient to abandon
892        # any heisenfiles that were not explicitly closed in that connection.
893        if abandoned or not has_changed:
894            d = defer.succeed(None)
895            self.async_.addBoth(self._do_close)
896        else:
897            d = defer.Deferred()
898            self.async_.addCallback(_commit)
899            self.async_.addBoth(self._do_close, d)
900        d.addBoth(_convert_error, request)
901        return d
902
903    def getAttrs(self):
904        request = ".getAttrs()"
905        self.log(request, level=OPERATIONAL)
906
907        if self.closed:
908            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
909            return defer.execute(_closed)
910
911        # Optimization for read-only handles, when we already know the metadata.
912        if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
913            return defer.succeed(_populate_attrs(self.filenode, self.metadata))
914
915        d = defer.Deferred()
916        def _get(ign):
917            if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
918
919            # self.filenode might be None, but that's ok.
920            attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
921            eventually_callback(d)(attrs)
922            return None
923        self.async_.addCallbacks(_get, eventually_errback(d))
924        d.addBoth(_convert_error, request)
925        return d
926
927    def setAttrs(self, attrs, only_if_at=None):
928        request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
929        self.log(request, level=OPERATIONAL)
930
931        if not (self.flags & FXF_WRITE):
932            def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
933            return defer.execute(_denied)
934
935        if self.closed:
936            def _closed(): raise createSFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
937            return defer.execute(_closed)
938
939        size = attrs.get("size", None)
940        if size is not None and (not isinstance(size, int) or size < 0):
941            def _bad(): raise createSFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
942            return defer.execute(_bad)
943
944        d = defer.Deferred()
945        def _set(ign):
946            if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
947            current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
948            if only_if_at and only_if_at != current_direntry:
949                if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
950                                   (current_direntry, request), level=NOISY)
951                return None
952
953            now = time()
954            self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
955            if size is not None:
956                # TODO: should we refuse to truncate a file opened with FXF_APPEND?
957                # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
958                self.consumer.set_current_size(size)
959            eventually_callback(d)(None)
960            return None
961        self.async_.addCallbacks(_set, eventually_errback(d))
962        d.addBoth(_convert_error, request)
963        return d
964
965
966class StoppableList(object):
967    def __init__(self, items):
968        self.items = items
969    def __iter__(self):
970        for i in self.items:
971            yield i
972    def close(self):
973        pass
974
975
976class Reason(object):
977    def __init__(self, value):
978        self.value = value
979
980
981# A "heisenfile" is a file that has been opened with write flags
982# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
983# 'all_heisenfiles' maps from a direntry string to a list of
984# GeneralSFTPFile.
985#
986# A direntry string is parent_write_uri + "/" + childname_utf8 for
987# an immutable file, or file_write_uri for a mutable file.
988# Updates to this dict are single-threaded.
989
990all_heisenfiles = {}
991
992def _reload():
993    global all_heisenfiles
994    all_heisenfiles = {}
995
996@implementer(ISFTPServer)
997class SFTPUserHandler(ConchUser, PrefixingLogMixin):
998    def __init__(self, client, rootnode, username):
999        ConchUser.__init__(self)
1000        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
1001        if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
1002
1003        self.channelLookup[b"session"] = session.SSHSession
1004        self.subsystemLookup[b"sftp"] = FileTransferServer
1005
1006        self._client = client
1007        self._root = rootnode
1008        self._username = username
1009        self._convergence = client.convergence
1010
1011        # maps from UTF-8 paths for this user, to files written and still open
1012        self._heisenfiles = {}
1013
1014    def gotVersion(self, otherVersion, extData):
1015        self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
1016
1017        # advertise the same extensions as the OpenSSH SFTP server
1018        # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1019        return {'posix-rename@openssh.com': '1',
1020                'statvfs@openssh.com': '2',
1021                'fstatvfs@openssh.com': '2',
1022               }
1023
1024    def logout(self):
1025        self.log(".logout()", level=OPERATIONAL)
1026
1027        for files in self._heisenfiles.values():
1028            for f in files:
1029                f.abandon()
1030
1031    def _add_heisenfile_by_path(self, file):
1032        self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
1033
1034        userpath = file.get_userpath()
1035        if userpath in self._heisenfiles:
1036            self._heisenfiles[userpath] += [file]
1037        else:
1038            self._heisenfiles[userpath] = [file]
1039
1040    def _add_heisenfile_by_direntry(self, file):
1041        self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
1042
1043        direntry = file.get_direntry()
1044        if direntry:
1045            if direntry in all_heisenfiles:
1046                all_heisenfiles[direntry] += [file]
1047            else:
1048                all_heisenfiles[direntry] = [file]
1049
1050    def _abandon_any_heisenfiles(self, userpath, direntry):
1051        request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1052        self.log(request, level=OPERATIONAL)
1053
1054        precondition(isinstance(userpath, bytes), userpath=userpath)
1055
1056        # First we synchronously mark all heisenfiles matching the userpath or direntry
1057        # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1058        # each file that we abandoned.
1059        #
1060        # For each file, the call to .abandon() occurs:
1061        #   * before the file is closed, in which case it will never be committed
1062        #     (uploaded+linked or published); or
1063        #   * after it is closed but before it has been close_notified, in which case the
1064        #     .sync() ensures that it has been committed (successfully or not) before we
1065        #     return.
1066        #
1067        # This avoids a race that might otherwise cause the file to be committed after
1068        # the remove operation has completed.
1069        #
1070        # We return a Deferred that fires with True if any files were abandoned (this
1071        # does not mean that they were not committed; it is used to determine whether
1072        # a NoSuchChildError from the attempt to delete the file should be suppressed).
1073
1074        files = []
1075        if direntry in all_heisenfiles:
1076            files = all_heisenfiles[direntry]
1077            del all_heisenfiles[direntry]
1078        if userpath in self._heisenfiles:
1079            files += self._heisenfiles[userpath]
1080            del self._heisenfiles[userpath]
1081
1082        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1083
1084        for f in files:
1085            f.abandon()
1086
1087        d = defer.succeed(None)
1088        for f in files:
1089            d.addBoth(f.sync)
1090
1091        def _done(ign):
1092            self.log("done %r" % (request,), level=OPERATIONAL)
1093            return len(files) > 0
1094        d.addBoth(_done)
1095        return d
1096
1097    def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1098                            to_userpath, to_parent, to_childname, overwrite=True):
1099        request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1100                   (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1101        self.log(request, level=OPERATIONAL)
1102
1103        precondition((isinstance(from_userpath, bytes) and isinstance(from_childname, str) and
1104                      isinstance(to_userpath, bytes) and isinstance(to_childname, str)),
1105                     from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
1106
1107        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1108
1109        # First we synchronously rename all heisenfiles matching the userpath or direntry.
1110        # Then we .sync() each file that we renamed.
1111        #
1112        # For each file, the call to .rename occurs:
1113        #   * before the file is closed, in which case it will be committed at the
1114        #     new direntry; or
1115        #   * after it is closed but before it has been close_notified, in which case the
1116        #     .sync() ensures that it has been committed (successfully or not) before we
1117        #     return.
1118        #
1119        # This avoids a race that might otherwise cause the file to be committed at the
1120        # old name after the rename operation has completed.
1121        #
1122        # Note that if overwrite is False, the caller should already have checked
1123        # whether a real direntry exists at the destination. It is possible that another
1124        # direntry (heisen or real) comes to exist at the destination after that check,
1125        # but in that case it is correct for the rename to succeed (and for the commit
1126        # of the heisenfile at the destination to possibly clobber the other entry, since
1127        # that can happen anyway when we have concurrent write handles to the same direntry).
1128        #
1129        # We return a Deferred that fires with True if any files were renamed (this
1130        # does not mean that they were not committed; it is used to determine whether
1131        # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1132        # is False and there were already heisenfiles at the destination userpath or
1133        # direntry, we return a Deferred that fails with createSFTPError(FX_PERMISSION_DENIED).
1134
1135        from_direntry = _direntry_for(from_parent, from_childname)
1136        to_direntry = _direntry_for(to_parent, to_childname)
1137
1138        if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1139                           (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1140
1141        if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1142            def _existing(): raise createSFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + str(to_userpath, "utf-8"))
1143            if noisy: self.log("existing", level=NOISY)
1144            return defer.execute(_existing)
1145
1146        from_files = []
1147        if from_direntry in all_heisenfiles:
1148            from_files = all_heisenfiles[from_direntry]
1149            del all_heisenfiles[from_direntry]
1150        if from_userpath in self._heisenfiles:
1151            from_files += self._heisenfiles[from_userpath]
1152            del self._heisenfiles[from_userpath]
1153
1154        if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1155
1156        for f in from_files:
1157            f.rename(to_userpath, to_parent, to_childname)
1158            self._add_heisenfile_by_path(f)
1159            self._add_heisenfile_by_direntry(f)
1160
1161        d = defer.succeed(None)
1162        for f in from_files:
1163            d.addBoth(f.sync)
1164
1165        def _done(ign):
1166            if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1167                               (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1168            return len(from_files) > 0
1169        d.addBoth(_done)
1170        return d
1171
1172    def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1173        request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1174        self.log(request, level=OPERATIONAL)
1175
1176        _assert(isinstance(userpath, bytes) and isinstance(direntry, bytes),
1177                userpath=userpath, direntry=direntry)
1178
1179        files = []
1180        if direntry in all_heisenfiles:
1181            files = all_heisenfiles[direntry]
1182        if userpath in self._heisenfiles:
1183            files += self._heisenfiles[userpath]
1184
1185        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1186
1187        # We set the metadata for all heisenfiles at this path or direntry.
1188        # Since a direntry includes a write URI, we must have authority to
1189        # change the metadata of heisenfiles found in the all_heisenfiles dict.
1190        # However that's not necessarily the case for heisenfiles found by
1191        # path. Therefore we tell the setAttrs method of each file to only
1192        # perform the update if the file is at the correct direntry.
1193
1194        d = defer.succeed(None)
1195        for f in files:
1196            d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1197
1198        def _done(ign):
1199            self.log("done %r" % (request,), level=OPERATIONAL)
1200            # TODO: this should not return True if only_if_at caused all files to be skipped.
1201            return len(files) > 0
1202        d.addBoth(_done)
1203        return d
1204
1205    def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1206        request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1207        self.log(request, level=OPERATIONAL)
1208
1209        _assert(isinstance(userpath, bytes) and isinstance(direntry, (bytes, type(None))),
1210                userpath=userpath, direntry=direntry)
1211
1212        files = []
1213        if direntry in all_heisenfiles:
1214            files = all_heisenfiles[direntry]
1215        if userpath in self._heisenfiles:
1216            files += self._heisenfiles[userpath]
1217
1218        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1219
1220        d = defer.succeed(None)
1221        for f in files:
1222            if f is not ignore:
1223                d.addBoth(f.sync)
1224
1225        def _done(ign):
1226            self.log("done %r" % (request,), level=OPERATIONAL)
1227            return None
1228        d.addBoth(_done)
1229        return d
1230
1231    def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1232        if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1233
1234        _assert(isinstance(userpath, bytes) and isinstance(childname, (str, type(None))),
1235                userpath=userpath, childname=childname)
1236
1237        direntry = _direntry_for(parent, childname)
1238        if direntry in all_heisenfiles:
1239            all_old_files = all_heisenfiles[direntry]
1240            all_new_files = [f for f in all_old_files if f is not file_to_remove]
1241            if len(all_new_files) > 0:
1242                all_heisenfiles[direntry] = all_new_files
1243            else:
1244                del all_heisenfiles[direntry]
1245
1246        if userpath in self._heisenfiles:
1247            old_files = self._heisenfiles[userpath]
1248            new_files = [f for f in old_files if f is not file_to_remove]
1249            if len(new_files) > 0:
1250                self._heisenfiles[userpath] = new_files
1251            else:
1252                del self._heisenfiles[userpath]
1253
1254        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1255
1256    def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1257        if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1258                           (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1259                           level=NOISY)
1260
1261        _assert((isinstance(userpath, bytes) and isinstance(childname, (str, type(None))) and
1262                 (metadata is None or 'no-write' in metadata)),
1263                userpath=userpath, childname=childname, metadata=metadata)
1264
1265        writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1266        direntry = _direntry_for(parent, childname, filenode)
1267
1268        d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1269
1270        if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1271            d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1272        else:
1273            close_notify = None
1274            if writing:
1275                close_notify = self._remove_heisenfile
1276
1277            d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1278            def _got_file(file):
1279                file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1280                if writing:
1281                    self._add_heisenfile_by_direntry(file)
1282                return file
1283            d.addCallback(_got_file)
1284        return d
1285
1286    def openFile(self, pathstring, flags, attrs, delay=None):
1287        request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1288        self.log(request, level=OPERATIONAL)
1289
1290        # This is used for both reading and writing.
1291        # First exclude invalid combinations of flags, and empty paths.
1292
1293        if not (flags & (FXF_READ | FXF_WRITE)):
1294            def _bad_readwrite():
1295                raise createSFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1296            return defer.execute(_bad_readwrite)
1297
1298        if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1299            def _bad_exclcreat():
1300                raise createSFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1301            return defer.execute(_bad_exclcreat)
1302
1303        path = self._path_from_string(pathstring)
1304        if not path:
1305            def _emptypath(): raise createSFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1306            return defer.execute(_emptypath)
1307
1308        # The combination of flags is potentially valid.
1309
1310        # To work around clients that have race condition bugs, a getAttr, rename, or
1311        # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1312        # should succeed even if the 'open' request has not yet completed. So we now
1313        # synchronously add a file object into the self._heisenfiles dict, indexed
1314        # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1315        # because we don't yet have a user-independent path for the file.) The file
1316        # object does not know its filenode, parent, or childname at this point.
1317
1318        userpath = self._path_to_utf8(path)
1319
1320        if flags & (FXF_WRITE | FXF_CREAT):
1321            file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1322            self._add_heisenfile_by_path(file)
1323        else:
1324            # We haven't decided which file implementation to use yet.
1325            file = None
1326
1327        desired_metadata = _attrs_to_metadata(attrs)
1328
1329        # Now there are two major cases:
1330        #
1331        #  1. The path is specified as /uri/FILECAP, with no parent directory.
1332        #     If the FILECAP is mutable and writeable, then we can open it in write-only
1333        #     or read/write mode (non-exclusively), otherwise we can only open it in
1334        #     read-only mode. The open should succeed immediately as long as FILECAP is
1335        #     a valid known filecap that grants the required permission.
1336        #
1337        #  2. The path is specified relative to a parent. We find the parent dirnode and
1338        #     get the child's URI and metadata if it exists. There are four subcases:
1339        #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1340        #          to write to the parent directory.
1341        #       b. the child exists but is not a valid known filecap: fail
1342        #       c. the child is mutable: if we are trying to open it write-only or
1343        #          read/write, then we must be able to write to the file.
1344        #       d. the child is immutable: if we are trying to open it write-only or
1345        #          read/write, then we must be able to write to the parent directory.
1346        #
1347        # To reduce latency, open normally succeeds as soon as these conditions are
1348        # met, even though there might be a failure in downloading the existing file
1349        # or uploading a new one. However, there is an exception: if a file has been
1350        # written, then closed, and is now being reopened, then we have to delay the
1351        # open until the previous upload/publish has completed. This is necessary
1352        # because sshfs does not wait for the result of an FXF_CLOSE message before
1353        # reporting to the client that a file has been closed. It applies both to
1354        # mutable files, and to directory entries linked to an immutable file.
1355        #
1356        # Note that the permission checks below are for more precise error reporting on
1357        # the open call; later operations would fail even if we did not make these checks.
1358
1359        d = delay or defer.succeed(None)
1360        d.addCallback(lambda ign: self._get_root(path))
1361        def _got_root(root_and_path):
1362            (root, path) = root_and_path
1363            if root.is_unknown():
1364                raise createSFTPError(FX_PERMISSION_DENIED,
1365                                "cannot open an unknown cap (or child of an unknown object). "
1366                                "Upgrading the gateway to a later Tahoe-LAFS version may help")
1367            if not path:
1368                # case 1
1369                if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1370                if not IFileNode.providedBy(root):
1371                    raise createSFTPError(FX_PERMISSION_DENIED,
1372                                    "cannot open a directory cap")
1373                if (flags & FXF_WRITE) and root.is_readonly():
1374                    raise createSFTPError(FX_PERMISSION_DENIED,
1375                                    "cannot write to a non-writeable filecap without a parent directory")
1376                if flags & FXF_EXCL:
1377                    raise createSFTPError(FX_FAILURE,
1378                                    "cannot create a file exclusively when it already exists")
1379
1380                # The file does not need to be added to all_heisenfiles, because it is not
1381                # associated with a directory entry that needs to be updated.
1382
1383                metadata = update_metadata(None, desired_metadata, time())
1384
1385                # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1386                # given that we don't actually have a parent. This only affects the permissions
1387                # reported by a getAttrs on this file handle in the case of an immutable file.
1388                # We choose 'parent_readonly=True' since that will cause the permissions to be
1389                # reported as r--r--r--, which is appropriate because an immutable file can't be
1390                # written via this path.
1391
1392                metadata['no-write'] = _no_write(True, root)
1393                return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1394            else:
1395                # case 2
1396                childname = path[-1]
1397
1398                if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1399                                   (root, childname, desired_metadata, path[:-1]), level=NOISY)
1400                d2 = root.get_child_at_path(path[:-1])
1401                def _got_parent(parent):
1402                    if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1403                    if parent.is_unknown():
1404                        raise createSFTPError(FX_PERMISSION_DENIED,
1405                                        "cannot open a child of an unknown object. "
1406                                        "Upgrading the gateway to a later Tahoe-LAFS version may help")
1407
1408                    parent_readonly = parent.is_readonly()
1409                    d3 = defer.succeed(None)
1410                    if flags & FXF_EXCL:
1411                        # FXF_EXCL means that the link to the file (not the file itself) must
1412                        # be created atomically wrt updates by this storage client.
1413                        # That is, we need to create the link before returning success to the
1414                        # SFTP open request (and not just on close, as would normally be the
1415                        # case). We make the link initially point to a zero-length LIT file,
1416                        # which is consistent with what might happen on a POSIX filesystem.
1417
1418                        if parent_readonly:
1419                            raise createSFTPError(FX_FAILURE,
1420                                            "cannot create a file exclusively when the parent directory is read-only")
1421
1422                        # 'overwrite=False' ensures failure if the link already exists.
1423                        # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1424
1425                        zero_length_lit = b"URI:LIT:"
1426                        if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1427                                           (parent, zero_length_lit, childname), level=NOISY)
1428                        d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1429                                                                  metadata=desired_metadata, overwrite=False))
1430                        def _seturi_done(child):
1431                            if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1432                            d4 = parent.get_metadata_for(childname)
1433                            d4.addCallback(lambda metadata: (child, metadata))
1434                            return d4
1435                        d3.addCallback(_seturi_done)
1436                    else:
1437                        if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1438                        d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1439
1440                    def _got_child(filenode_and_current_metadata):
1441                        (filenode, current_metadata) = filenode_and_current_metadata
1442                        if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1443
1444                        metadata = update_metadata(current_metadata, desired_metadata, time())
1445
1446                        # Ignore the permissions of the desired_metadata in an open call. The permissions
1447                        # can only be set by setAttrs.
1448                        metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1449
1450                        if filenode.is_unknown():
1451                            raise createSFTPError(FX_PERMISSION_DENIED,
1452                                            "cannot open an unknown cap. Upgrading the gateway "
1453                                            "to a later Tahoe-LAFS version may help")
1454                        if not IFileNode.providedBy(filenode):
1455                            raise createSFTPError(FX_PERMISSION_DENIED,
1456                                            "cannot open a directory as if it were a file")
1457                        if (flags & FXF_WRITE) and metadata['no-write']:
1458                            raise createSFTPError(FX_PERMISSION_DENIED,
1459                                            "cannot open a non-writeable file for writing")
1460
1461                        return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1462                                               filenode=filenode, metadata=metadata)
1463                    def _no_child(f):
1464                        if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1465                        f.trap(NoSuchChildError)
1466
1467                        if not (flags & FXF_CREAT):
1468                            raise createSFTPError(FX_NO_SUCH_FILE,
1469                                            "the file does not exist, and was not opened with the creation (CREAT) flag")
1470                        if parent_readonly:
1471                            raise createSFTPError(FX_PERMISSION_DENIED,
1472                                            "cannot create a file when the parent directory is read-only")
1473
1474                        return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1475                    d3.addCallbacks(_got_child, _no_child)
1476                    return d3
1477
1478                d2.addCallback(_got_parent)
1479                return d2
1480
1481        d.addCallback(_got_root)
1482        def _remove_on_error(err):
1483            if file:
1484                self._remove_heisenfile(userpath, None, None, file)
1485            return err
1486        d.addErrback(_remove_on_error)
1487        d.addBoth(_convert_error, request)
1488        return d
1489
1490    def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1491        request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1492        self.log(request, level=OPERATIONAL)
1493
1494        from_path = self._path_from_string(from_pathstring)
1495        to_path = self._path_from_string(to_pathstring)
1496        from_userpath = self._path_to_utf8(from_path)
1497        to_userpath = self._path_to_utf8(to_path)
1498
1499        # the target directory must already exist
1500        d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1501                                        self._get_parent_or_node(to_path)])
1502        def _got(from_pair_and_to_pair):
1503            (from_pair, to_pair) = from_pair_and_to_pair
1504            if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1505                               (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1506            (from_parent, from_childname) = from_pair
1507            (to_parent, to_childname) = to_pair
1508
1509            if from_childname is None:
1510                raise createSFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1511            if to_childname is None:
1512                raise createSFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1513
1514            # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1515            # "It is an error if there already exists a file with the name specified
1516            #  by newpath."
1517            # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1518            #
1519            # For the standard SSH_FXP_RENAME operation, overwrite=False.
1520            # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1521
1522            d2 = defer.succeed(None)
1523            if not overwrite:
1524                d2.addCallback(lambda ign: to_parent.get(to_childname))
1525                def _expect_fail(res):
1526                    if not isinstance(res, Failure):
1527                        raise createSFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + str(to_userpath, "utf-8"))
1528
1529                    # It is OK if we fail for errors other than NoSuchChildError, since that probably
1530                    # indicates some problem accessing the destination directory.
1531                    res.trap(NoSuchChildError)
1532                d2.addBoth(_expect_fail)
1533
1534            # If there are heisenfiles to be written at the 'from' direntry, then ensure
1535            # they will now be written at the 'to' direntry instead.
1536            d2.addCallback(lambda ign:
1537                           self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1538                                                    to_userpath, to_parent, to_childname, overwrite=overwrite))
1539
1540            def _move(renamed):
1541                # FIXME: use move_child_to_path to avoid possible data loss due to #943
1542                #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1543
1544                d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1545                def _check(err):
1546                    if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1547                                       (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1548
1549                    if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1550                        return None
1551                    if not overwrite and err.check(ExistingChildError):
1552                        raise createSFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + str(to_userpath, "utf-8"))
1553
1554                    return err
1555                d3.addBoth(_check)
1556                return d3
1557            d2.addCallback(_move)
1558            return d2
1559        d.addCallback(_got)
1560        d.addBoth(_convert_error, request)
1561        return d
1562
1563    def makeDirectory(self, pathstring, attrs):
1564        request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1565        self.log(request, level=OPERATIONAL)
1566
1567        path = self._path_from_string(pathstring)
1568        metadata = _attrs_to_metadata(attrs)
1569        if 'no-write' in metadata:
1570            def _denied(): raise createSFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1571            return defer.execute(_denied)
1572
1573        d = self._get_root(path)
1574        d.addCallback(lambda root_and_path:
1575                      self._get_or_create_directories(root_and_path[0], root_and_path[1], metadata))
1576        d.addBoth(_convert_error, request)
1577        return d
1578
1579    def _get_or_create_directories(self, node, path, metadata):
1580        if not IDirectoryNode.providedBy(node):
1581            # TODO: provide the name of the blocking file in the error message.
1582            def _blocked(): raise createSFTPError(FX_FAILURE, "cannot create directory because there "
1583                                                        "is a file in the way") # close enough
1584            return defer.execute(_blocked)
1585
1586        if not path:
1587            return defer.succeed(node)
1588        d = node.get(path[0])
1589        def _maybe_create(f):
1590            f.trap(NoSuchChildError)
1591            return node.create_subdirectory(path[0])
1592        d.addErrback(_maybe_create)
1593        d.addCallback(self._get_or_create_directories, path[1:], metadata)
1594        return d
1595
1596    def removeFile(self, pathstring):
1597        request = ".removeFile(%r)" % (pathstring,)
1598        self.log(request, level=OPERATIONAL)
1599
1600        path = self._path_from_string(pathstring)
1601        d = self._remove_object(path, must_be_file=True)
1602        d.addBoth(_convert_error, request)
1603        return d
1604
1605    def removeDirectory(self, pathstring):
1606        request = ".removeDirectory(%r)" % (pathstring,)
1607        self.log(request, level=OPERATIONAL)
1608
1609        path = self._path_from_string(pathstring)
1610        d = self._remove_object(path, must_be_directory=True)
1611        d.addBoth(_convert_error, request)
1612        return d
1613
1614    def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1615        userpath = self._path_to_utf8(path)
1616        d = self._get_parent_or_node(path)
1617        def _got_parent(parent_and_childname):
1618            (parent, childname) = parent_and_childname
1619            if childname is None:
1620                raise createSFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1621
1622            direntry = _direntry_for(parent, childname)
1623            d2 = defer.succeed(False)
1624            if not must_be_directory:
1625                d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1626
1627            d2.addCallback(lambda abandoned:
1628                           parent.delete(childname, must_exist=not abandoned,
1629                                         must_be_directory=must_be_directory, must_be_file=must_be_file))
1630            return d2
1631        d.addCallback(_got_parent)
1632        return d
1633
1634    def openDirectory(self, pathstring):
1635        request = ".openDirectory(%r)" % (pathstring,)
1636        self.log(request, level=OPERATIONAL)
1637
1638        path = self._path_from_string(pathstring)
1639        d = self._get_parent_or_node(path)
1640        def _got_parent_or_node(parent_or_node__and__childname):
1641            (parent_or_node, childname) = parent_or_node__and__childname
1642            if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1643                               (parent_or_node, childname, pathstring), level=NOISY)
1644            if childname is None:
1645                return parent_or_node
1646            else:
1647                return parent_or_node.get(childname)
1648        d.addCallback(_got_parent_or_node)
1649        def _list(dirnode):
1650            if dirnode.is_unknown():
1651                raise createSFTPError(FX_PERMISSION_DENIED,
1652                                "cannot list an unknown cap as a directory. Upgrading the gateway "
1653                                "to a later Tahoe-LAFS version may help")
1654            if not IDirectoryNode.providedBy(dirnode):
1655                raise createSFTPError(FX_PERMISSION_DENIED,
1656                                "cannot list a file as if it were a directory")
1657
1658            d2 = dirnode.list()
1659            def _render(children):
1660                parent_readonly = dirnode.is_readonly()
1661                results = []
1662                for filename, (child, metadata) in list(children.items()):
1663                    # The file size may be cached or absent.
1664                    metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1665                    attrs = _populate_attrs(child, metadata)
1666                    filename_utf8 = filename.encode('utf-8')
1667                    longname = _lsLine(filename_utf8, attrs)
1668                    results.append( (filename_utf8, longname, attrs) )
1669                return StoppableList(results)
1670            d2.addCallback(_render)
1671            return d2
1672        d.addCallback(_list)
1673        d.addBoth(_convert_error, request)
1674        return d
1675
1676    def getAttrs(self, pathstring, followLinks):
1677        request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1678        self.log(request, level=OPERATIONAL)
1679
1680        # When asked about a specific file, report its current size.
1681        # TODO: the modification time for a mutable file should be
1682        # reported as the update time of the best version. But that
1683        # information isn't currently stored in mutable shares, I think.
1684
1685        path = self._path_from_string(pathstring)
1686        userpath = self._path_to_utf8(path)
1687        d = self._get_parent_or_node(path)
1688        def _got_parent_or_node(parent_or_node__and__childname):
1689            (parent_or_node, childname) = parent_or_node__and__childname
1690            if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1691
1692            # Some clients will incorrectly try to get the attributes
1693            # of a file immediately after opening it, before it has been put
1694            # into the all_heisenfiles table. This is a race condition bug in
1695            # the client, but we handle it anyway by calling .sync() on all
1696            # files matching either the path or the direntry.
1697
1698            direntry = _direntry_for(parent_or_node, childname)
1699            d2 = self._sync_heisenfiles(userpath, direntry)
1700
1701            if childname is None:
1702                node = parent_or_node
1703                d2.addCallback(lambda ign: node.get_current_size())
1704                d2.addCallback(lambda size:
1705                               _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1706            else:
1707                parent = parent_or_node
1708                d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1709                def _got(child_and_metadata):
1710                    (child, metadata) = child_and_metadata
1711                    if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1712                    _assert(IDirectoryNode.providedBy(parent), parent=parent)
1713                    metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1714                    d3 = child.get_current_size()
1715                    d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1716                    return d3
1717                def _nosuch(err):
1718                    if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1719                    err.trap(NoSuchChildError)
1720                    if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1721                                       (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1722                    if direntry in all_heisenfiles:
1723                        files = all_heisenfiles[direntry]
1724                        if len(files) == 0:  # pragma: no cover
1725                            return err
1726                        # use the heisenfile that was most recently opened
1727                        return files[-1].getAttrs()
1728                    return err
1729                d2.addCallbacks(_got, _nosuch)
1730            return d2
1731        d.addCallback(_got_parent_or_node)
1732        d.addBoth(_convert_error, request)
1733        return d
1734
1735    def setAttrs(self, pathstring, attrs):
1736        request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1737        self.log(request, level=OPERATIONAL)
1738
1739        if "size" in attrs:
1740            # this would require us to download and re-upload the truncated/extended
1741            # file contents
1742            def _unsupported(): raise createSFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1743            return defer.execute(_unsupported)
1744
1745        path = self._path_from_string(pathstring)
1746        userpath = self._path_to_utf8(path)
1747        d = self._get_parent_or_node(path)
1748        def _got_parent_or_node(parent_or_node__and__childname):
1749            (parent_or_node, childname) = parent_or_node__and__childname
1750            if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1751
1752            direntry = _direntry_for(parent_or_node, childname)
1753            d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1754
1755            def _update(updated_heisenfiles):
1756                if childname is None:
1757                    if updated_heisenfiles:
1758                        return None
1759                    raise createSFTPError(FX_NO_SUCH_FILE, userpath)
1760                else:
1761                    desired_metadata = _attrs_to_metadata(attrs)
1762                    if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1763
1764                    d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1765                    def _nosuch(err):
1766                        if updated_heisenfiles:
1767                            err.trap(NoSuchChildError)
1768                        else:
1769                            return err
1770                    d3.addErrback(_nosuch)
1771                    return d3
1772            d2.addCallback(_update)
1773            d2.addCallback(lambda ign: None)
1774            return d2
1775        d.addCallback(_got_parent_or_node)
1776        d.addBoth(_convert_error, request)
1777        return d
1778
1779    def readLink(self, pathstring):
1780        self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1781
1782        def _unsupported(): raise createSFTPError(FX_OP_UNSUPPORTED, "readLink")
1783        return defer.execute(_unsupported)
1784
1785    def makeLink(self, linkPathstring, targetPathstring):
1786        self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1787
1788        # If this is implemented, note the reversal of arguments described in point 7 of
1789        # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1790
1791        def _unsupported(): raise createSFTPError(FX_OP_UNSUPPORTED, "makeLink")
1792        return defer.execute(_unsupported)
1793
1794    def extendedRequest(self, extensionName, extensionData):
1795        self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1796
1797        # We implement the three main OpenSSH SFTP extensions; see
1798        # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1799
1800        if extensionName == b'posix-rename@openssh.com':
1801            def _bad(): raise createSFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1802
1803            if 4 > len(extensionData): return defer.execute(_bad)
1804            (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1805            if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1806
1807            (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1808            if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1809
1810            fromPathstring = extensionData[4:(4 + fromPathLen)]
1811            toPathstring = extensionData[(8 + fromPathLen):]
1812            d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1813
1814            # Twisted conch assumes that the response from an extended request is either
1815            # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1816            # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1817            def _succeeded(ign):
1818                raise createSFTPError(FX_OK, "request succeeded")
1819            d.addCallback(_succeeded)
1820            return d
1821
1822        if extensionName == b'statvfs@openssh.com' or extensionName == b'fstatvfs@openssh.com':
1823            # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1824            return defer.succeed(struct.pack('>11Q',
1825                1024,         # uint64  f_bsize     /* file system block size */
1826                1024,         # uint64  f_frsize    /* fundamental fs block size */
1827                628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1828                314159265,    # uint64  f_bfree     /* free blocks in file system */
1829                314159265,    # uint64  f_bavail    /* free blocks for non-root */
1830                200000000,    # uint64  f_files     /* total file inodes */
1831                100000000,    # uint64  f_ffree     /* free file inodes */
1832                100000000,    # uint64  f_favail    /* free file inodes for non-root */
1833                0x1AF5,       # uint64  f_fsid      /* file system id */
1834                2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1835                65535,        # uint64  f_namemax   /* maximum filename length */
1836                ))
1837
1838        def _unsupported(): raise createSFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1839                                                               (extensionName, len(extensionData)))
1840        return defer.execute(_unsupported)
1841
1842    def realPath(self, pathstring):
1843        self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1844
1845        return self._path_to_utf8(self._path_from_string(pathstring))
1846
1847    def _path_to_utf8(self, path):
1848        return (u"/" + u"/".join(path)).encode('utf-8')
1849
1850    def _path_from_string(self, pathstring):
1851        if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1852
1853        _assert(isinstance(pathstring, bytes), pathstring=pathstring)
1854
1855        # The home directory is the root directory.
1856        pathstring = pathstring.strip(b"/")
1857        if pathstring == b"" or pathstring == b".":
1858            path_utf8 = []
1859        else:
1860            path_utf8 = pathstring.split(b"/")
1861
1862        # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1863        # "Servers SHOULD interpret a path name component ".." as referring to
1864        #  the parent directory, and "." as referring to the current directory."
1865        path = []
1866        for p_utf8 in path_utf8:
1867            if p_utf8 == b"..":
1868                # ignore excess .. components at the root
1869                if len(path) > 0:
1870                    path = path[:-1]
1871            elif p_utf8 != b".":
1872                try:
1873                    p = p_utf8.decode('utf-8', 'strict')
1874                except UnicodeError:
1875                    raise createSFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1876                path.append(p)
1877
1878        if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1879        return path
1880
1881    def _get_root(self, path):
1882        # return Deferred (root, remaining_path)
1883        d = defer.succeed(None)
1884        if path and path[0] == u"uri":
1885            d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1886            d.addCallback(lambda root: (root, path[2:]))
1887        else:
1888            d.addCallback(lambda ign: (self._root, path))
1889        return d
1890
1891    def _get_parent_or_node(self, path):
1892        # return Deferred (parent, childname) or (node, None)
1893        d = self._get_root(path)
1894        def _got_root(root_and_remaining_path):
1895            (root, remaining_path) = root_and_remaining_path
1896            if not remaining_path:
1897                return (root, None)
1898            else:
1899                d2 = root.get_child_at_path(remaining_path[:-1])
1900                d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1901                return d2
1902        d.addCallback(_got_root)
1903        return d
1904
1905
1906@implementer(ITransport)
1907class FakeTransport(object):
1908    def write(self, data):
1909        logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1910
1911    def writeSequence(self, data):
1912        logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1913
1914    def loseConnection(self):
1915        logmsg("FakeTransport.loseConnection()", level=NOISY)
1916
1917    def getHost(self):
1918        raise NotImplementedError()
1919
1920    def getPeer(self):
1921        raise NotImplementedError()
1922
1923
1924@implementer(ISession)
1925class ShellSession(PrefixingLogMixin):
1926    def __init__(self, userHandler):
1927        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1928        if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1929
1930    def getPty(self, terminal, windowSize, attrs):
1931        self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1932
1933    def openShell(self, protocol):
1934        self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1935        if hasattr(protocol, 'transport') and protocol.transport is None:
1936            protocol.transport = FakeTransport()  # work around Twisted bug
1937
1938        return self._unsupported(protocol)
1939
1940    def execCommand(self, protocol, cmd):
1941        self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1942        if hasattr(protocol, 'transport') and protocol.transport is None:
1943            protocol.transport = FakeTransport()  # work around Twisted bug
1944
1945        d = defer.succeed(None)
1946        if cmd == "df -P -k /":
1947            d.addCallback(lambda ign: protocol.write(
1948                          "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1949                          "tahoe                628318530 314159265 314159265      50% /\r\n"))
1950            d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1951        else:
1952            d.addCallback(lambda ign: self._unsupported(protocol))
1953        return d
1954
1955    def _unsupported(self, protocol):
1956        d = defer.succeed(None)
1957        d.addCallback(lambda ign: protocol.errReceived(
1958                      "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1959                      "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1960        d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1961        return d
1962
1963    def windowChanged(self, newWindowSize):
1964        self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1965
1966    def eofReceived(self):
1967        self.log(".eofReceived()", level=OPERATIONAL)
1968
1969    def closed(self):
1970        self.log(".closed()", level=OPERATIONAL)
1971
1972
1973# If you have an SFTPUserHandler and want something that provides ISession, you get
1974# ShellSession(userHandler).
1975# We use adaptation because this must be a different object to the SFTPUserHandler.
1976components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1977
1978
1979from allmydata.frontends.auth import AccountFileChecker, NeedRootcapLookupScheme
1980
1981@implementer(portal.IRealm)
1982class Dispatcher(object):
1983    def __init__(self, client):
1984        self._client = client
1985
1986    def requestAvatar(self, avatarId, mind, *interfaces):
1987        [interface] = interfaces
1988        _assert(interface == IConchUser, interface=interface)
1989        rootnode = self._client.create_node_from_uri(avatarId.rootcap)
1990        handler = SFTPUserHandler(self._client, rootnode, avatarId.username)
1991        return (interface, handler, handler.logout)
1992
1993
1994class SFTPServer(service.MultiService):
1995    # The type in Twisted for services is wrong in 22.10...
1996    # https://github.com/twisted/twisted/issues/10135
1997    name = "frontend:sftp"  # type: ignore[assignment]
1998
1999    def __init__(self, client, accountfile,
2000                 sftp_portstr, pubkey_file, privkey_file):
2001        precondition(isinstance(accountfile, (str, type(None))), accountfile)
2002        precondition(isinstance(pubkey_file, str), pubkey_file)
2003        precondition(isinstance(privkey_file, str), privkey_file)
2004        service.MultiService.__init__(self)
2005
2006        r = Dispatcher(client)
2007        p = portal.Portal(r)
2008
2009        if accountfile:
2010            c = AccountFileChecker(self, accountfile)
2011            p.registerChecker(c)
2012        if not accountfile:
2013            # we could leave this anonymous, with just the /uri/CAP form
2014            raise NeedRootcapLookupScheme("must provide an account file")
2015
2016        pubkey = keys.Key.fromFile(pubkey_file.encode(get_filesystem_encoding()))
2017        privkey = keys.Key.fromFile(privkey_file.encode(get_filesystem_encoding()))
2018        class SSHFactory(factory.SSHFactory):
2019            publicKeys = {pubkey.sshType(): pubkey}
2020            privateKeys = {privkey.sshType(): privkey}
2021            def getPrimes(self):
2022                try:
2023                    # if present, this enables diffie-hellman-group-exchange
2024                    return primes.parseModuliFile("/etc/ssh/moduli")
2025                except IOError:
2026                    return None
2027
2028        f = SSHFactory()
2029        f.portal = p
2030
2031        s = strports.service(six.ensure_str(sftp_portstr), f)
2032        s.setServiceParent(self)
Note: See TracBrowser for help on using the repository browser.