source: trunk/src/allmydata/mutable/filenode.py

Last change on this file was 101453c, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-10-23T13:57:32Z

Make operation non-blocking (assuming GIL is released)

  • Property mode set to 100644
File size: 45.0 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6import random
7
8from zope.interface import implementer
9from twisted.internet import defer, reactor
10from foolscap.api import eventually
11
12from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
13     NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
14     IMutableFileVersion, IWriteable
15from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
16from allmydata.util.assertutil import precondition
17from allmydata.util.cputhreadpool import defer_to_thread
18from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
19                          WriteableMDMFFileURI, ReadonlyMDMFFileURI
20from allmydata.monitor import Monitor
21from allmydata.mutable.publish import Publish, MutableData,\
22                                      TransformingUploadable
23from allmydata.mutable.common import (
24    MODE_READ,
25    MODE_WRITE,
26    MODE_CHECK,
27    UnrecoverableFileError,
28    UncoordinatedWriteError,
29    derive_mutable_keys,
30)
31from allmydata.mutable.servermap import ServerMap, ServermapUpdater
32from allmydata.mutable.retrieve import Retrieve
33from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
34from allmydata.mutable.repairer import Repairer
35
36
37class BackoffAgent(object):
38    # these parameters are copied from foolscap.reconnector, which gets them
39    # from twisted.internet.protocol.ReconnectingClientFactory
40    initialDelay = 1.0
41    factor = 2.7182818284590451 # (math.e)
42    jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
43    maxRetries = 4
44
45    def __init__(self):
46        self._delay = self.initialDelay
47        self._count = 0
48    def delay(self, node, f):
49        self._count += 1
50        if self._count == 4:
51            return f
52        self._delay = self._delay * self.factor
53        self._delay = random.normalvariate(self._delay,
54                                           self._delay * self.jitter)
55        d = defer.Deferred()
56        reactor.callLater(self._delay, d.callback, None)
57        return d
58
59# use nodemaker.create_mutable_file() to make one of these
60
61@implementer(IMutableFileNode, ICheckable)
62class MutableFileNode(object):
63
64    def __init__(self, storage_broker, secret_holder,
65                 default_encoding_parameters, history):
66        self._storage_broker = storage_broker
67        self._secret_holder = secret_holder
68        self._default_encoding_parameters = default_encoding_parameters
69        self._history = history
70        self._pubkey = None # filled in upon first read
71        self._privkey = None # filled in if we're mutable
72        # we keep track of the last encoding parameters that we use. These
73        # are updated upon retrieve, and used by publish. If we publish
74        # without ever reading (i.e. overwrite()), then we use these values.
75        self._required_shares = default_encoding_parameters["k"]
76        self._total_shares = default_encoding_parameters["n"]
77        self._sharemap = {} # known shares, shnum-to-[nodeids]
78        self._most_recent_size = None
79        # filled in after __init__ if we're being created for the first time;
80        # filled in by the servermap updater before publishing, otherwise.
81        # set to this default value in case neither of those things happen,
82        # or in case the servermap can't find any shares to tell us what
83        # to publish as.
84        self._protocol_version = None
85
86        # all users of this MutableFileNode go through the serializer. This
87        # takes advantage of the fact that Deferreds discard the callbacks
88        # that they're done with, so we can keep using the same Deferred
89        # forever without consuming more and more memory.
90        self._serializer = defer.succeed(None)
91
92        # Starting with MDMF, we can get these from caps if they're
93        # there. Leave them alone for now; they'll be filled in by my
94        # init_from_cap method if necessary.
95        self._downloader_hints = {}
96
97    def __repr__(self):
98        if hasattr(self, '_uri'):
99            return "<%s %x %s %r>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
100        else:
101            return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
102
103    def init_from_cap(self, filecap):
104        # we have the URI, but we have not yet retrieved the public
105        # verification key, nor things like 'k' or 'N'. If and when someone
106        # wants to get our contents, we'll pull from shares and fill those
107        # in.
108        if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
109            self._protocol_version = MDMF_VERSION
110        elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
111            self._protocol_version = SDMF_VERSION
112
113        self._uri = filecap
114        self._writekey = None
115
116        if not filecap.is_readonly() and filecap.is_mutable():
117            self._writekey = self._uri.writekey
118        self._readkey = self._uri.readkey
119        self._storage_index = self._uri.storage_index
120        self._fingerprint = self._uri.fingerprint
121        # the following values are learned during Retrieval
122        #  self._pubkey
123        #  self._required_shares
124        #  self._total_shares
125        # and these are needed for Publish. They are filled in by Retrieval
126        # if possible, otherwise by the first peer that Publish talks to.
127        self._privkey = None
128        self._encprivkey = None
129
130        return self
131
132    @deferredutil.async_to_deferred
133    async def create_with_keys(self, keypair, contents,
134                         version=SDMF_VERSION):
135        """Call this to create a brand-new mutable file. It will create the
136        shares, find homes for them, and upload the initial contents (created
137        with the same rules as IClient.create_mutable_file() ). Returns a
138        Deferred that fires (with the MutableFileNode instance you should
139        use) when it completes.
140        """
141        self._pubkey, self._privkey = keypair
142        self._writekey, self._encprivkey, self._fingerprint = await defer_to_thread(
143            derive_mutable_keys, keypair
144        )
145        if version == MDMF_VERSION:
146            self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
147            self._protocol_version = version
148        elif version == SDMF_VERSION:
149            self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
150            self._protocol_version = version
151        self._readkey = self._uri.readkey
152        self._storage_index = self._uri.storage_index
153        initial_contents = self._get_initial_contents(contents)
154        return await self._upload(initial_contents, None)
155
156    def _get_initial_contents(self, contents):
157        if contents is None:
158            return MutableData(b"")
159
160        if isinstance(contents, bytes):
161            return MutableData(contents)
162
163        if IMutableUploadable.providedBy(contents):
164            return contents
165
166        assert callable(contents), "%s should be callable, not %s" % \
167               (contents, type(contents))
168        return contents(self)
169
170    def _populate_pubkey(self, pubkey):
171        self._pubkey = pubkey
172    def _populate_required_shares(self, required_shares):
173        self._required_shares = required_shares
174    def _populate_total_shares(self, total_shares):
175        self._total_shares = total_shares
176
177    def _populate_privkey(self, privkey):
178        self._privkey = privkey
179    def _populate_encprivkey(self, encprivkey):
180        self._encprivkey = encprivkey
181
182    def get_write_enabler(self, server):
183        seed = server.get_foolscap_write_enabler_seed()
184        assert len(seed) == 20
185        return hashutil.ssk_write_enabler_hash(self._writekey, seed)
186    def get_renewal_secret(self, server):
187        crs = self._secret_holder.get_renewal_secret()
188        frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
189        lease_seed = server.get_lease_seed()
190        assert len(lease_seed) == 20
191        return hashutil.bucket_renewal_secret_hash(frs, lease_seed)
192    def get_cancel_secret(self, server):
193        ccs = self._secret_holder.get_cancel_secret()
194        fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
195        lease_seed = server.get_lease_seed()
196        assert len(lease_seed) == 20
197        return hashutil.bucket_cancel_secret_hash(fcs, lease_seed)
198
199    def get_writekey(self):
200        return self._writekey
201    def get_readkey(self):
202        return self._readkey
203    def get_storage_index(self):
204        return self._storage_index
205    def get_fingerprint(self):
206        return self._fingerprint
207    def get_privkey(self):
208        return self._privkey
209    def get_encprivkey(self):
210        return self._encprivkey
211    def get_pubkey(self):
212        return self._pubkey
213
214    def get_required_shares(self):
215        return self._required_shares
216    def get_total_shares(self):
217        return self._total_shares
218
219    ####################################
220    # IFilesystemNode
221
222    def get_size(self):
223        return self._most_recent_size
224
225    def get_current_size(self):
226        d = self.get_size_of_best_version()
227        d.addCallback(self._stash_size)
228        return d
229
230    def _stash_size(self, size):
231        self._most_recent_size = size
232        return size
233
234    def get_cap(self):
235        return self._uri
236    def get_readcap(self):
237        return self._uri.get_readonly()
238    def get_verify_cap(self):
239        return self._uri.get_verify_cap()
240    def get_repair_cap(self):
241        if self._uri.is_readonly():
242            return None
243        return self._uri
244
245    def get_uri(self):
246        return self._uri.to_string()
247
248    def get_write_uri(self):
249        if self.is_readonly():
250            return None
251        return self._uri.to_string()
252
253    def get_readonly_uri(self):
254        return self._uri.get_readonly().to_string()
255
256    def get_readonly(self):
257        if self.is_readonly():
258            return self
259        ro = MutableFileNode(self._storage_broker, self._secret_holder,
260                             self._default_encoding_parameters, self._history)
261        ro.init_from_cap(self._uri.get_readonly())
262        return ro
263
264    def is_mutable(self):
265        return self._uri.is_mutable()
266
267    def is_readonly(self):
268        return self._uri.is_readonly()
269
270    def is_unknown(self):
271        return False
272
273    def is_allowed_in_immutable_directory(self):
274        return not self._uri.is_mutable()
275
276    def raise_error(self):
277        pass
278
279    def __hash__(self):
280        return hash((self.__class__, self._uri))
281
282    def __eq__(self, them):
283        if type(self) != type(them):
284            return False
285        return self._uri == them._uri
286
287    def __ne__(self, them):
288        return not (self == them)
289
290    #################################
291    # ICheckable
292
293    def check(self, monitor, verify=False, add_lease=False):
294        checker = MutableChecker(self, self._storage_broker,
295                                 self._history, monitor)
296        return checker.check(verify, add_lease)
297
298    def check_and_repair(self, monitor, verify=False, add_lease=False):
299        checker = MutableCheckAndRepairer(self, self._storage_broker,
300                                          self._history, monitor)
301        return checker.check(verify, add_lease)
302
303    #################################
304    # IRepairable
305
306    def repair(self, check_results, force=False, monitor=None):
307        assert ICheckResults(check_results)
308        r = Repairer(self, check_results, self._storage_broker,
309                     self._history, monitor)
310        d = r.start(force)
311        return d
312
313
314    #################################
315    # IFileNode
316
317    def get_best_readable_version(self):
318        """
319        I return a Deferred that fires with a MutableFileVersion
320        representing the best readable version of the file that I
321        represent
322        """
323        return self.get_readable_version()
324
325
326    def get_readable_version(self, servermap=None, version=None):
327        """
328        I return a Deferred that fires with an MutableFileVersion for my
329        version argument, if there is a recoverable file of that version
330        on the grid. If there is no recoverable version, I fire with an
331        UnrecoverableFileError.
332
333        If a servermap is provided, I look in there for the requested
334        version. If no servermap is provided, I create and update a new
335        one.
336
337        If no version is provided, then I return a MutableFileVersion
338        representing the best recoverable version of the file.
339        """
340        d = self._get_version_from_servermap(MODE_READ, servermap, version)
341        def _build_version(servermap_and_their_version):
342            (servermap, their_version) = servermap_and_their_version
343            assert their_version in servermap.recoverable_versions()
344            assert their_version in servermap.make_versionmap()
345
346            mfv = MutableFileVersion(self,
347                                     servermap,
348                                     their_version,
349                                     self._storage_index,
350                                     self._storage_broker,
351                                     self._readkey,
352                                     history=self._history)
353            assert mfv.is_readonly()
354            mfv.set_downloader_hints(self._downloader_hints)
355            # our caller can use this to download the contents of the
356            # mutable file.
357            return mfv
358        return d.addCallback(_build_version)
359
360
361    def _get_version_from_servermap(self,
362                                    mode,
363                                    servermap=None,
364                                    version=None):
365        """
366        I return a Deferred that fires with (servermap, version).
367
368        This function performs validation and a servermap update. If it
369        returns (servermap, version), the caller can assume that:
370            - servermap was last updated in mode.
371            - version is recoverable, and corresponds to the servermap.
372
373        If version and servermap are provided to me, I will validate
374        that version exists in the servermap, and that the servermap was
375        updated correctly.
376
377        If version is not provided, but servermap is, I will validate
378        the servermap and return the best recoverable version that I can
379        find in the servermap.
380
381        If the version is provided but the servermap isn't, I will
382        obtain a servermap that has been updated in the correct mode and
383        validate that version is found and recoverable.
384
385        If neither servermap nor version are provided, I will obtain a
386        servermap updated in the correct mode, and return the best
387        recoverable version that I can find in there.
388        """
389        # XXX: wording ^^^^
390        if servermap and servermap.get_last_update()[0] == mode:
391            d = defer.succeed(servermap)
392        else:
393            d = self._get_servermap(mode)
394
395        def _get_version(servermap, v):
396            if v and v not in servermap.recoverable_versions():
397                v = None
398            elif not v:
399                v = servermap.best_recoverable_version()
400            if not v:
401                raise UnrecoverableFileError("no recoverable versions")
402
403            return (servermap, v)
404        return d.addCallback(_get_version, version)
405
406
407    def download_best_version(self):
408        """
409        I return a Deferred that fires with the contents of the best
410        version of this mutable file.
411        """
412        return self._do_serialized(self._download_best_version)
413
414
415    def _download_best_version(self):
416        """
417        I am the serialized sibling of download_best_version.
418        """
419        d = self.get_best_readable_version()
420        d.addCallback(self._record_size)
421        d.addCallback(lambda version: version.download_to_data())
422
423        # It is possible that the download will fail because there
424        # aren't enough shares to be had. If so, we will try again after
425        # updating the servermap in MODE_WRITE, which may find more
426        # shares than updating in MODE_READ, as we just did. We can do
427        # this by getting the best mutable version and downloading from
428        # that -- the best mutable version will be a MutableFileVersion
429        # with a servermap that was last updated in MODE_WRITE, as we
430        # want. If this fails, then we give up.
431        def _maybe_retry(failure):
432            failure.trap(NotEnoughSharesError)
433
434            d = self.get_best_mutable_version()
435            d.addCallback(self._record_size)
436            d.addCallback(lambda version: version.download_to_data())
437            return d
438
439        d.addErrback(_maybe_retry)
440        return d
441
442
443    def _record_size(self, mfv):
444        """
445        I record the size of a mutable file version.
446        """
447        self._most_recent_size = mfv.get_size()
448        return mfv
449
450
451    def get_size_of_best_version(self):
452        """
453        I return the size of the best version of this mutable file.
454
455        This is equivalent to calling get_size() on the result of
456        get_best_readable_version().
457        """
458        d = self.get_best_readable_version()
459        return d.addCallback(lambda mfv: mfv.get_size())
460
461
462    #################################
463    # IMutableFileNode
464
465    def get_best_mutable_version(self, servermap=None):
466        """
467        I return a Deferred that fires with a MutableFileVersion
468        representing the best readable version of the file that I
469        represent. I am like get_best_readable_version, except that I
470        will try to make a writeable version if I can.
471        """
472        return self.get_mutable_version(servermap=servermap)
473
474
475    def get_mutable_version(self, servermap=None, version=None):
476        """
477        I return a version of this mutable file. I return a Deferred
478        that fires with a MutableFileVersion
479
480        If version is provided, the Deferred will fire with a
481        MutableFileVersion initailized with that version. Otherwise, it
482        will fire with the best version that I can recover.
483
484        If servermap is provided, I will use that to find versions
485        instead of performing my own servermap update.
486        """
487        if self.is_readonly():
488            return self.get_readable_version(servermap=servermap,
489                                             version=version)
490
491        # get_mutable_version => write intent, so we require that the
492        # servermap is updated in MODE_WRITE
493        d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
494        def _build_version(servermap_and_smap_version):
495            # these should have been set by the servermap update.
496            (servermap, smap_version) = servermap_and_smap_version
497            assert self._secret_holder
498            assert self._writekey
499
500            mfv = MutableFileVersion(self,
501                                     servermap,
502                                     smap_version,
503                                     self._storage_index,
504                                     self._storage_broker,
505                                     self._readkey,
506                                     self._writekey,
507                                     self._secret_holder,
508                                     history=self._history)
509            assert not mfv.is_readonly()
510            mfv.set_downloader_hints(self._downloader_hints)
511            return mfv
512
513        return d.addCallback(_build_version)
514
515
516    # XXX: I'm uncomfortable with the difference between upload and
517    #      overwrite, which, FWICT, is basically that you don't have to
518    #      do a servermap update before you overwrite. We split them up
519    #      that way anyway, so I guess there's no real difficulty in
520    #      offering both ways to callers, but it also makes the
521    #      public-facing API cluttery, and makes it hard to discern the
522    #      right way of doing things.
523
524    # In general, we leave it to callers to ensure that they aren't
525    # going to cause UncoordinatedWriteErrors when working with
526    # MutableFileVersions. We know that the next three operations
527    # (upload, overwrite, and modify) will all operate on the same
528    # version, so we say that only one of them can be going on at once,
529    # and serialize them to ensure that that actually happens, since as
530    # the caller in this situation it is our job to do that.
531    def overwrite(self, new_contents):
532        """
533        I overwrite the contents of the best recoverable version of this
534        mutable file with new_contents. This is equivalent to calling
535        overwrite on the result of get_best_mutable_version with
536        new_contents as an argument. I return a Deferred that eventually
537        fires with the results of my replacement process.
538        """
539        # TODO: Update downloader hints.
540        return self._do_serialized(self._overwrite, new_contents)
541
542
543    def _overwrite(self, new_contents):
544        """
545        I am the serialized sibling of overwrite.
546        """
547        d = self.get_best_mutable_version()
548        d.addCallback(lambda mfv: mfv.overwrite(new_contents))
549        d.addCallback(self._did_upload, new_contents.get_size())
550        return d
551
552
553    def upload(self, new_contents, servermap):
554        """
555        I overwrite the contents of the best recoverable version of this
556        mutable file with new_contents, using servermap instead of
557        creating/updating our own servermap. I return a Deferred that
558        fires with the results of my upload.
559        """
560        # TODO: Update downloader hints
561        return self._do_serialized(self._upload, new_contents, servermap)
562
563
564    def modify(self, modifier, backoffer=None):
565        """
566        I modify the contents of the best recoverable version of this
567        mutable file with the modifier. This is equivalent to calling
568        modify on the result of get_best_mutable_version. I return a
569        Deferred that eventually fires with an UploadResults instance
570        describing this process.
571        """
572        # TODO: Update downloader hints.
573        return self._do_serialized(self._modify, modifier, backoffer)
574
575
576    def _modify(self, modifier, backoffer):
577        """
578        I am the serialized sibling of modify.
579        """
580        d = self.get_best_mutable_version()
581        d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
582        return d
583
584
585    def download_version(self, servermap, version, fetch_privkey=False):
586        """
587        Download the specified version of this mutable file. I return a
588        Deferred that fires with the contents of the specified version
589        as a bytestring, or errbacks if the file is not recoverable.
590        """
591        d = self.get_readable_version(servermap, version)
592        return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
593
594
595    def get_servermap(self, mode):
596        """
597        I return a servermap that has been updated in mode.
598
599        mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
600        MODE_ANYTHING. See servermap.py for more on what these mean.
601        """
602        return self._do_serialized(self._get_servermap, mode)
603
604
605    def _get_servermap(self, mode):
606        """
607        I am a serialized twin to get_servermap.
608        """
609        servermap = ServerMap()
610        d = self._update_servermap(servermap, mode)
611        # The servermap will tell us about the most recent size of the
612        # file, so we may as well set that so that callers might get
613        # more data about us.
614        if not self._most_recent_size:
615            d.addCallback(self._get_size_from_servermap)
616        return d
617
618
619    def _get_size_from_servermap(self, servermap):
620        """
621        I extract the size of the best version of this file and record
622        it in self._most_recent_size. I return the servermap that I was
623        given.
624        """
625        if servermap.recoverable_versions():
626            v = servermap.best_recoverable_version()
627            size = v[4] # verinfo[4] == size
628            self._most_recent_size = size
629        return servermap
630
631
632    def _update_servermap(self, servermap, mode):
633        u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
634                             mode)
635        if self._history:
636            self._history.notify_mapupdate(u.get_status())
637        return u.update()
638
639
640    #def set_version(self, version):
641        # I can be set in two ways:
642        #  1. When the node is created.
643        #  2. (for an existing share) when the Servermap is updated
644        #     before I am read.
645    #    assert version in (MDMF_VERSION, SDMF_VERSION)
646    #    self._protocol_version = version
647
648
649    def get_version(self):
650        return self._protocol_version
651
652
653    def _do_serialized(self, cb, *args, **kwargs):
654        # note: to avoid deadlock, this callable is *not* allowed to invoke
655        # other serialized methods within this (or any other)
656        # MutableFileNode. The callable should be a bound method of this same
657        # MFN instance.
658        d = defer.Deferred()
659        self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
660        # we need to put off d.callback until this Deferred is finished being
661        # processed. Otherwise the caller's subsequent activities (like,
662        # doing other things with this node) can cause reentrancy problems in
663        # the Deferred code itself
664        self._serializer.addBoth(lambda res: eventually(d.callback, res))
665        # add a log.err just in case something really weird happens, because
666        # self._serializer stays around forever, therefore we won't see the
667        # usual Unhandled Error in Deferred that would give us a hint.
668        self._serializer.addErrback(log.err)
669        return d
670
671
672    def _upload(self, new_contents, servermap):
673        """
674        A MutableFileNode still has to have some way of getting
675        published initially, which is what I am here for. After that,
676        all publishing, updating, modifying and so on happens through
677        MutableFileVersions.
678        """
679        assert self._pubkey, "update_servermap must be called before publish"
680
681        # Define IPublishInvoker with a set_downloader_hints method?
682        # Then have the publisher call that method when it's done publishing?
683        p = Publish(self, self._storage_broker, servermap)
684        if self._history:
685            self._history.notify_publish(p.get_status(),
686                                         new_contents.get_size())
687        d = p.publish(new_contents)
688        d.addCallback(self._did_upload, new_contents.get_size())
689        return d
690
691
692    def set_downloader_hints(self, hints):
693        self._downloader_hints = hints
694
695    def _did_upload(self, res, size):
696        self._most_recent_size = size
697        return res
698
699
700@implementer(IMutableFileVersion, IWriteable)
701class MutableFileVersion(object):
702    """
703    I represent a specific version (most likely the best version) of a
704    mutable file.
705
706    Since I implement IReadable, instances which hold a
707    reference to an instance of me are guaranteed the ability (absent
708    connection difficulties or unrecoverable versions) to read the file
709    that I represent. Depending on whether I was initialized with a
710    write capability or not, I may also provide callers the ability to
711    overwrite or modify the contents of the mutable file that I
712    reference.
713    """
714
715    def __init__(self,
716                 node,
717                 servermap,
718                 version,
719                 storage_index,
720                 storage_broker,
721                 readcap,
722                 writekey=None,
723                 write_secrets=None,
724                 history=None):
725
726        self._node = node
727        self._servermap = servermap
728        self._version = version
729        self._storage_index = storage_index
730        self._write_secrets = write_secrets
731        self._history = history
732        self._storage_broker = storage_broker
733
734        #assert isinstance(readcap, IURI)
735        self._readcap = readcap
736
737        self._writekey = writekey
738        self._serializer = defer.succeed(None)
739
740
741    def get_sequence_number(self):
742        """
743        Get the sequence number of the mutable version that I represent.
744        """
745        return self._version[0] # verinfo[0] == the sequence number
746
747
748    # TODO: Terminology?
749    def get_writekey(self):
750        """
751        I return a writekey or None if I don't have a writekey.
752        """
753        return self._writekey
754
755
756    def set_downloader_hints(self, hints):
757        """
758        I set the downloader hints.
759        """
760        assert isinstance(hints, dict)
761
762        self._downloader_hints = hints
763
764
765    def get_downloader_hints(self):
766        """
767        I return the downloader hints.
768        """
769        return self._downloader_hints
770
771
772    def overwrite(self, new_contents):
773        """
774        I overwrite the contents of this mutable file version with the
775        data in new_contents.
776        """
777        assert not self.is_readonly()
778
779        return self._do_serialized(self._overwrite, new_contents)
780
781
782    def _overwrite(self, new_contents):
783        assert IMutableUploadable.providedBy(new_contents)
784        assert self._servermap.get_last_update()[0] == MODE_WRITE
785
786        return self._upload(new_contents)
787
788
789    def modify(self, modifier, backoffer=None):
790        """I use a modifier callback to apply a change to the mutable file.
791        I implement the following pseudocode::
792
793         obtain_mutable_filenode_lock()
794         first_time = True
795         while True:
796           update_servermap(MODE_WRITE)
797           old = retrieve_best_version()
798           new = modifier(old, servermap, first_time)
799           first_time = False
800           if new == old: break
801           try:
802             publish(new)
803           except UncoordinatedWriteError, e:
804             backoffer(e)
805             continue
806           break
807         release_mutable_filenode_lock()
808
809        The idea is that your modifier function can apply a delta of some
810        sort, and it will be re-run as necessary until it succeeds. The
811        modifier must inspect the old version to see whether its delta has
812        already been applied: if so it should return the contents unmodified.
813
814        Note that the modifier is required to run synchronously, and must not
815        invoke any methods on this MutableFileNode instance.
816
817        The backoff-er is a callable that is responsible for inserting a
818        random delay between subsequent attempts, to help competing updates
819        from colliding forever. It is also allowed to give up after a while.
820        The backoffer is given two arguments: this MutableFileNode, and the
821        Failure object that contains the UncoordinatedWriteError. It should
822        return a Deferred that will fire when the next attempt should be
823        made, or return the Failure if the loop should give up. If
824        backoffer=None, a default one is provided which will perform
825        exponential backoff, and give up after 4 tries. Note that the
826        backoffer should not invoke any methods on this MutableFileNode
827        instance, and it needs to be highly conscious of deadlock issues.
828        """
829        assert not self.is_readonly()
830
831        return self._do_serialized(self._modify, modifier, backoffer)
832
833
834    def _modify(self, modifier, backoffer):
835        if backoffer is None:
836            backoffer = BackoffAgent().delay
837        return self._modify_and_retry(modifier, backoffer, True)
838
839
840    def _modify_and_retry(self, modifier, backoffer, first_time):
841        """
842        I try to apply modifier to the contents of this version of the
843        mutable file. If I succeed, I return an UploadResults instance
844        describing my success. If I fail, I try again after waiting for
845        a little bit.
846        """
847        log.msg("doing modify")
848        if first_time:
849            d = self._update_servermap()
850        else:
851            # We ran into trouble; do MODE_CHECK so we're a little more
852            # careful on subsequent tries.
853            d = self._update_servermap(mode=MODE_CHECK)
854
855        d.addCallback(lambda ignored:
856            self._modify_once(modifier, first_time))
857        def _retry(f):
858            f.trap(UncoordinatedWriteError)
859            # Uh oh, it broke. We're allowed to trust the servermap for our
860            # first try, but after that we need to update it. It's
861            # possible that we've failed due to a race with another
862            # uploader, and if the race is to converge correctly, we
863            # need to know about that upload.
864            d2 = defer.maybeDeferred(backoffer, self, f)
865            d2.addCallback(lambda ignored:
866                           self._modify_and_retry(modifier,
867                                                  backoffer, False))
868            return d2
869        d.addErrback(_retry)
870        return d
871
872
873    def _modify_once(self, modifier, first_time):
874        """
875        I attempt to apply a modifier to the contents of the mutable
876        file.
877        """
878        assert self._servermap.get_last_update()[0] != MODE_READ
879
880        # download_to_data is serialized, so we have to call this to
881        # avoid deadlock.
882        d = self._try_to_download_data()
883        def _apply(old_contents):
884            new_contents = modifier(old_contents, self._servermap, first_time)
885            precondition((isinstance(new_contents, bytes) or
886                          new_contents is None),
887                         "Modifier function must return bytes "
888                         "or None")
889
890            if new_contents is None or new_contents == old_contents:
891                log.msg("no changes")
892                # no changes need to be made
893                if first_time:
894                    return
895                # However, since Publish is not automatically doing a
896                # recovery when it observes UCWE, we need to do a second
897                # publish. See #551 for details. We'll basically loop until
898                # we managed an uncontested publish.
899                old_uploadable = MutableData(old_contents)
900                new_contents = old_uploadable
901            else:
902                new_contents = MutableData(new_contents)
903
904            return self._upload(new_contents)
905        d.addCallback(_apply)
906        return d
907
908
909    def is_readonly(self):
910        """
911        I return True if this MutableFileVersion provides no write
912        access to the file that it encapsulates, and False if it
913        provides the ability to modify the file.
914        """
915        return self._writekey is None
916
917
918    def is_mutable(self):
919        """
920        I return True, since mutable files are always mutable by
921        somebody.
922        """
923        return True
924
925
926    def get_storage_index(self):
927        """
928        I return the storage index of the reference that I encapsulate.
929        """
930        return self._storage_index
931
932
933    def get_size(self):
934        """
935        I return the length, in bytes, of this readable object.
936        """
937        return self._servermap.size_of_version(self._version)
938
939
940    def download_to_data(self, fetch_privkey=False):  # type: ignore # fixme
941        """
942        I return a Deferred that fires with the contents of this
943        readable object as a byte string.
944
945        """
946        c = consumer.MemoryConsumer()
947        d = self.read(c, fetch_privkey=fetch_privkey)
948        d.addCallback(lambda mc: b"".join(mc.chunks))
949        return d
950
951
952    def _try_to_download_data(self):
953        """
954        I am an unserialized cousin of download_to_data; I am called
955        from the children of modify() to download the data associated
956        with this mutable version.
957        """
958        c = consumer.MemoryConsumer()
959        # modify will almost certainly write, so we need the privkey.
960        d = self._read(c, fetch_privkey=True)
961        d.addCallback(lambda mc: b"".join(mc.chunks))
962        return d
963
964
965    def read(self, consumer, offset=0, size=None, fetch_privkey=False):
966        """
967        I read a portion (possibly all) of the mutable file that I
968        reference into consumer.
969        """
970        return self._do_serialized(self._read, consumer, offset, size,
971                                   fetch_privkey)
972
973
974    def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
975        """
976        I am the serialized companion of read.
977        """
978        r = Retrieve(self._node, self._storage_broker, self._servermap,
979                     self._version, fetch_privkey)
980        if self._history:
981            self._history.notify_retrieve(r.get_status())
982        d = r.download(consumer, offset, size)
983        return d
984
985
986    def _do_serialized(self, cb, *args, **kwargs):
987        # note: to avoid deadlock, this callable is *not* allowed to invoke
988        # other serialized methods within this (or any other)
989        # MutableFileNode. The callable should be a bound method of this same
990        # MFN instance.
991        d = defer.Deferred()
992        self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
993        # we need to put off d.callback until this Deferred is finished being
994        # processed. Otherwise the caller's subsequent activities (like,
995        # doing other things with this node) can cause reentrancy problems in
996        # the Deferred code itself
997        self._serializer.addBoth(lambda res: eventually(d.callback, res))
998        # add a log.err just in case something really weird happens, because
999        # self._serializer stays around forever, therefore we won't see the
1000        # usual Unhandled Error in Deferred that would give us a hint.
1001        self._serializer.addErrback(log.err)
1002        return d
1003
1004
1005    def _upload(self, new_contents):
1006        #assert self._pubkey, "update_servermap must be called before publish"
1007        p = Publish(self._node, self._storage_broker, self._servermap)
1008        if self._history:
1009            self._history.notify_publish(p.get_status(),
1010                                         new_contents.get_size())
1011        d = p.publish(new_contents)
1012        d.addCallback(self._did_upload, new_contents.get_size())
1013        return d
1014
1015
1016    def _did_upload(self, res, size):
1017        self._most_recent_size = size
1018        return res
1019
1020    def update(self, data, offset):
1021        """
1022        Do an update of this mutable file version by inserting data at
1023        offset within the file. If offset is the EOF, this is an append
1024        operation. I return a Deferred that fires with the results of
1025        the update operation when it has completed.
1026
1027        In cases where update does not append any data, or where it does
1028        not append so many blocks that the block count crosses a
1029        power-of-two boundary, this operation will use roughly
1030        O(data.get_size()) memory/bandwidth/CPU to perform the update.
1031        Otherwise, it must download, re-encode, and upload the entire
1032        file again, which will use O(filesize) resources.
1033        """
1034        return self._do_serialized(self._update, data, offset)
1035
1036
1037    def _update(self, data, offset):
1038        """
1039        I update the mutable file version represented by this particular
1040        IMutableVersion by inserting the data in data at the offset
1041        offset. I return a Deferred that fires when this has been
1042        completed.
1043        """
1044        new_size = data.get_size() + offset
1045        old_size = self.get_size()
1046        segment_size = self._version[3]
1047        num_old_segments = mathutil.div_ceil(old_size,
1048                                             segment_size)
1049        num_new_segments = mathutil.div_ceil(new_size,
1050                                             segment_size)
1051        log.msg("got %d old segments, %d new segments" % \
1052                        (num_old_segments, num_new_segments))
1053
1054        # We do a whole file re-encode if the file is an SDMF file.
1055        if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1056            log.msg("doing re-encode instead of in-place update")
1057            return self._do_modify_update(data, offset)
1058
1059        # Otherwise, we can replace just the parts that are changing.
1060        log.msg("updating in place")
1061        d = self._do_update_update(data, offset)
1062        d.addCallback(self._decode_and_decrypt_segments, data, offset)
1063        d.addCallback(self._build_uploadable_and_finish, data, offset)
1064        return d
1065
1066
1067    def _do_modify_update(self, data, offset):
1068        """
1069        I perform a file update by modifying the contents of the file
1070        after downloading it, then reuploading it. I am less efficient
1071        than _do_update_update, but am necessary for certain updates.
1072        """
1073        def m(old, servermap, first_time):
1074            start = offset
1075            rest = offset + data.get_size()
1076            new = old[:start]
1077            new += b"".join(data.read(data.get_size()))
1078            new += old[rest:]
1079            return new
1080        return self._modify(m, None)
1081
1082
1083    def _do_update_update(self, data, offset):
1084        """
1085        I start the Servermap update that gets us the data we need to
1086        continue the update process. I return a Deferred that fires when
1087        the servermap update is done.
1088        """
1089        assert IMutableUploadable.providedBy(data)
1090        assert self.is_mutable()
1091        # offset == self.get_size() is valid and means that we are
1092        # appending data to the file.
1093        assert offset <= self.get_size()
1094
1095        segsize = self._version[3]
1096        # We'll need the segment that the data starts in, regardless of
1097        # what we'll do later.
1098        start_segment = offset // segsize
1099
1100        # We only need the end segment if the data we append does not go
1101        # beyond the current end-of-file.
1102        end_segment = start_segment
1103        if offset + data.get_size() < self.get_size():
1104            end_data = offset + data.get_size()
1105            # The last byte we touch is the end_data'th byte, which is actually
1106            # byte end_data - 1 because bytes are zero-indexed.
1107            end_data -= 1
1108            end_segment = end_data // segsize
1109
1110        self._start_segment = start_segment
1111        self._end_segment = end_segment
1112
1113        # Now ask for the servermap to be updated in MODE_WRITE with
1114        # this update range.
1115        return self._update_servermap(update_range=(start_segment,
1116                                                    end_segment))
1117
1118
1119    def _decode_and_decrypt_segments(self, ignored, data, offset):
1120        """
1121        After the servermap update, I take the encrypted and encoded
1122        data that the servermap fetched while doing its update and
1123        transform it into decoded-and-decrypted plaintext that can be
1124        used by the new uploadable. I return a Deferred that fires with
1125        the segments.
1126        """
1127        r = Retrieve(self._node, self._storage_broker, self._servermap,
1128                     self._version)
1129        # decode: takes in our blocks and salts from the servermap,
1130        # returns a Deferred that fires with the corresponding plaintext
1131        # segments. Does not download -- simply takes advantage of
1132        # existing infrastructure within the Retrieve class to avoid
1133        # duplicating code.
1134        sm = self._servermap
1135        # XXX: If the methods in the servermap don't work as
1136        # abstractions, you should rewrite them instead of going around
1137        # them.
1138        update_data = sm.update_data
1139        start_segments = {} # shnum -> start segment
1140        end_segments = {} # shnum -> end segment
1141        blockhashes = {} # shnum -> blockhash tree
1142        for (shnum, original_data) in list(update_data.items()):
1143            data = [d[1] for d in original_data if d[0] == self._version]
1144            # data is [(blockhashes,start,end)..]
1145
1146            # Every data entry in our list should now be share shnum for
1147            # a particular version of the mutable file, so all of the
1148            # entries should be identical.
1149            datum = data[0]
1150            assert [x for x in data if x != datum] == []
1151
1152            # datum is (blockhashes,start,end)
1153            blockhashes[shnum] = datum[0]
1154            start_segments[shnum] = datum[1] # (block,salt) bytestrings
1155            end_segments[shnum] = datum[2]
1156
1157        d1 = r.decode(start_segments, self._start_segment)
1158        d2 = r.decode(end_segments, self._end_segment)
1159        d3 = defer.succeed(blockhashes)
1160        return deferredutil.gatherResults([d1, d2, d3])
1161
1162
1163    def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1164        """
1165        After the process has the plaintext segments, I build the
1166        TransformingUploadable that the publisher will eventually
1167        re-upload to the grid. I then invoke the publisher with that
1168        uploadable, and return a Deferred when the publish operation has
1169        completed without issue.
1170        """
1171        u = TransformingUploadable(data, offset,
1172                                   self._version[3],
1173                                   segments_and_bht[0],
1174                                   segments_and_bht[1])
1175        p = Publish(self._node, self._storage_broker, self._servermap)
1176        return p.update(u, offset, segments_and_bht[2], self._version)
1177
1178
1179    def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1180        """
1181        I update the servermap. I return a Deferred that fires when the
1182        servermap update is done.
1183        """
1184        if update_range:
1185            u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1186                                 self._servermap,
1187                                 mode=mode,
1188                                 update_range=update_range)
1189        else:
1190            u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1191                                 self._servermap,
1192                                 mode=mode)
1193        return u.update()
1194
1195    # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3562
1196    def get_servermap(self):
1197        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.