source: trunk/src/allmydata/codec.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 3.1 KB
Line 
1"""
2CRS encoding and decoding.
3
4Ported to Python 3.
5"""
6
7from zope.interface import implementer
8from allmydata.util import mathutil
9from allmydata.util.assertutil import precondition
10from allmydata.util.cputhreadpool import defer_to_thread
11from allmydata.util.deferredutil import async_to_deferred
12from allmydata.interfaces import ICodecEncoder, ICodecDecoder
13import zfec
14
15@implementer(ICodecEncoder)
16class CRSEncoder(object):
17    ENCODER_TYPE = b"crs"
18
19    def set_params(self, data_size, required_shares, max_shares):
20        assert required_shares <= max_shares
21        self.data_size = data_size
22        self.required_shares = required_shares
23        self.max_shares = max_shares
24        self.share_size = mathutil.div_ceil(data_size, required_shares)
25        self.last_share_padding = mathutil.pad_size(self.share_size, required_shares)
26        self.encoder = zfec.Encoder(required_shares, max_shares)
27
28    def get_encoder_type(self):
29        return self.ENCODER_TYPE
30
31    def get_params(self):
32        return (self.data_size, self.required_shares, self.max_shares)
33
34    def get_serialized_params(self):
35        return b"%d-%d-%d" % (self.data_size, self.required_shares,
36                              self.max_shares)
37
38    def get_block_size(self):
39        return self.share_size
40
41    @async_to_deferred
42    async def encode(self, inshares, desired_share_ids=None):
43        precondition(desired_share_ids is None or len(desired_share_ids) <= self.max_shares, desired_share_ids, self.max_shares)
44
45        if desired_share_ids is None:
46            desired_share_ids = list(range(self.max_shares))
47
48        for inshare in inshares:
49            assert len(inshare) == self.share_size, (len(inshare), self.share_size, self.data_size, self.required_shares)
50        shares = await defer_to_thread(self.encoder.encode, inshares, desired_share_ids)
51        return (shares, desired_share_ids)
52
53    def encode_proposal(self, data, desired_share_ids=None):
54        raise NotImplementedError()
55
56
57@implementer(ICodecDecoder)
58class CRSDecoder(object):
59
60    def set_params(self, data_size, required_shares, max_shares):
61        self.data_size = data_size
62        self.required_shares = required_shares
63        self.max_shares = max_shares
64
65        self.chunk_size = self.required_shares
66        self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
67        self.share_size = self.num_chunks
68        self.decoder = zfec.Decoder(self.required_shares, self.max_shares)
69
70    def get_needed_shares(self):
71        return self.required_shares
72
73    @async_to_deferred
74    async def decode(self, some_shares, their_shareids):
75        precondition(len(some_shares) == len(their_shareids),
76                     len(some_shares), len(their_shareids))
77        precondition(len(some_shares) == self.required_shares,
78                     len(some_shares), self.required_shares)
79        return await defer_to_thread(
80            self.decoder.decode,
81            some_shares,
82            [int(s) for s in their_shareids]
83        )
84
85def parse_params(serializedparams):
86    pieces = serializedparams.split(b"-")
87    return int(pieces[0]), int(pieces[1]), int(pieces[2])
Note: See TracBrowser for help on using the repository browser.