source: trunk/src/allmydata/immutable/repairer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 4.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from zope.interface import implementer
6from twisted.internet import defer
7from allmydata.storage.server import si_b2a
8from allmydata.util import log, consumer
9from allmydata.util.assertutil import precondition
10from allmydata.interfaces import IEncryptedUploadable
11
12from allmydata.immutable import upload
13
14@implementer(IEncryptedUploadable)
15class Repairer(log.PrefixingLogMixin):
16    """I generate any shares which were not available and upload them to
17    servers.
18
19    Which servers? Well, I just use the normal upload process, so any servers
20    that will take shares. In fact, I even believe servers if they say that
21    they already have shares even if attempts to download those shares would
22    fail because the shares are corrupted.
23
24    My process of uploading replacement shares proceeds in a segment-wise
25    fashion -- first I ask servers if they can hold the new shares, and wait
26    until enough have agreed then I download the first segment of the file
27    and upload the first block of each replacement share, and only after all
28    those blocks have been uploaded do I download the second segment of the
29    file and upload the second block of each replacement share to its
30    respective server. (I do it this way in order to minimize the amount of
31    downloading I have to do and the amount of memory I have to use at any
32    one time.)
33
34    If any of the servers to which I am uploading replacement shares fails to
35    accept the blocks during this process, then I just stop using that
36    server, abandon any share-uploads that were going to that server, and
37    proceed to finish uploading the remaining shares to their respective
38    servers. At the end of my work, I produce an object which satisfies the
39    ICheckAndRepairResults interface (by firing the deferred that I returned
40    from start() and passing that check-and-repair-results object).
41
42    Before I send any new request to a server, I always ask the 'monitor'
43    object that was passed into my constructor whether this task has been
44    cancelled (by invoking its raise_if_cancelled() method).
45    """
46
47    def __init__(self, filenode, storage_broker, secret_holder, monitor):
48        logprefix = si_b2a(filenode.get_storage_index())[:5]
49        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
50                                       prefix=logprefix)
51        self._filenode = filenode
52        self._storage_broker = storage_broker
53        self._secret_holder = secret_holder
54        self._monitor = monitor
55        self._offset = 0
56
57    def start(self):
58        self.log("starting repair")
59        d = self._filenode.get_segment_size()
60        def _got_segsize(segsize):
61            vcap = self._filenode.get_verify_cap()
62            k = vcap.needed_shares
63            N = vcap.total_shares
64            # Per ticket #1212
65            # (http://tahoe-lafs.org/trac/tahoe-lafs/ticket/1212)
66            happy = 0
67            self._encodingparams = (k, happy, N, segsize)
68            # XXX should pass a reactor to this
69            ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
70            return ul.start(self) # I am the IEncryptedUploadable
71        d.addCallback(_got_segsize)
72        return d
73
74
75    # methods to satisfy the IEncryptedUploader interface
76    # (From the perspective of an uploader I am an IEncryptedUploadable.)
77    def set_upload_status(self, upload_status):
78        self.upload_status = upload_status
79    def get_size(self):
80        size = self._filenode.get_size()
81        assert size is not None
82        return defer.succeed(size)
83    def get_all_encoding_parameters(self):
84        return defer.succeed(self._encodingparams)
85    def read_encrypted(self, length, hash_only):
86        """Returns a deferred which eventually fires with the requested
87        ciphertext, as a list of strings."""
88        precondition(length) # please don't ask to read 0 bytes
89        mc = consumer.MemoryConsumer()
90        d = self._filenode.read(mc, self._offset, length)
91        self._offset += length
92        d.addCallback(lambda ign: mc.chunks)
93        return d
94    def get_storage_index(self):
95        return self._filenode.get_storage_index()
96    def close(self):
97        pass
Note: See TracBrowser for help on using the repository browser.