diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py
index 44c8e95..ed3785b 100644
a
|
b
|
import binascii |
3 | 3 | import copy |
4 | 4 | import time |
5 | 5 | now = time.time |
6 | | from zope.interface import implements |
| 6 | from zope.interface import implements, Interface |
7 | 7 | from twisted.internet import defer |
8 | 8 | from twisted.internet.interfaces import IConsumer |
9 | 9 | |
… |
… |
from allmydata.immutable.repairer import Repairer |
19 | 19 | from allmydata.immutable.downloader.node import DownloadNode |
20 | 20 | from allmydata.immutable.downloader.status import DownloadStatus |
21 | 21 | |
| 22 | class IDownloadStatusHandlingConsumer(Interface): |
| 23 | def set_download_status_read_event(read_ev): |
| 24 | """Record the DownloadStatus 'read event', to be updated with the |
| 25 | time it takes to decrypt each chunk of data.""" |
| 26 | |
22 | 27 | class CiphertextFileNode: |
23 | 28 | def __init__(self, verifycap, storage_broker, secret_holder, |
24 | | terminator, history, download_status=None): |
| 29 | terminator, history): |
25 | 30 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
26 | 31 | self._verifycap = verifycap |
27 | 32 | self._storage_broker = storage_broker |
28 | 33 | self._secret_holder = secret_holder |
29 | | if download_status is None: |
30 | | ds = DownloadStatus(verifycap.storage_index, verifycap.size) |
31 | | if history: |
32 | | history.add_download(ds) |
33 | | download_status = ds |
34 | 34 | self._terminator = terminator |
35 | 35 | self._history = history |
36 | | self._download_status = download_status |
| 36 | self._download_status = None |
37 | 37 | self._node = None # created lazily, on read() |
38 | 38 | |
39 | 39 | def _maybe_create_download_node(self): |
| 40 | if not self._download_status: |
| 41 | ds = DownloadStatus(self._verifycap.storage_index, |
| 42 | self._verifycap.size) |
| 43 | if self._history: |
| 44 | self._history.add_download(ds) |
| 45 | self._download_status = ds |
40 | 46 | if self._node is None: |
41 | 47 | self._node = DownloadNode(self._verifycap, self._storage_broker, |
42 | 48 | self._secret_holder, |
43 | 49 | self._terminator, |
44 | 50 | self._history, self._download_status) |
45 | 51 | |
46 | | def read(self, consumer, offset=0, size=None, read_ev=None): |
| 52 | def read(self, consumer, offset=0, size=None): |
47 | 53 | """I am the main entry point, from which FileNode.read() can get |
48 | 54 | data. I feed the consumer with the desired range of ciphertext. I |
49 | 55 | return a Deferred that fires (with the consumer) when the read is |
50 | 56 | finished.""" |
51 | 57 | self._maybe_create_download_node() |
| 58 | actual_size = size |
| 59 | if actual_size is None: |
| 60 | actual_size = self._verifycap.size - offset |
| 61 | read_ev = self._download_status.add_read_event(offset, actual_size, |
| 62 | now()) |
| 63 | if IDownloadStatusHandlingConsumer.providedBy(consumer): |
| 64 | consumer.set_download_status_read_event(read_ev) |
52 | 65 | return self._node.read(consumer, offset, size, read_ev) |
53 | 66 | |
54 | 67 | def get_segment(self, segnum): |
… |
… |
class CiphertextFileNode: |
155 | 168 | monitor=monitor) |
156 | 169 | return v.start() |
157 | 170 | |
158 | | |
159 | 171 | class DecryptingConsumer: |
160 | 172 | """I sit between a CiphertextDownloader (which acts as a Producer) and |
161 | 173 | the real Consumer, decrypting everything that passes by. The real |
162 | 174 | Consumer sees the real Producer, but the Producer sees us instead of the |
163 | 175 | real consumer.""" |
164 | | implements(IConsumer) |
| 176 | implements(IConsumer, IDownloadStatusHandlingConsumer) |
165 | 177 | |
166 | | def __init__(self, consumer, readkey, offset, read_event): |
| 178 | def __init__(self, consumer, readkey, offset): |
167 | 179 | self._consumer = consumer |
168 | | self._read_event = read_event |
| 180 | self._read_event = None |
169 | 181 | # TODO: pycryptopp CTR-mode needs random-access operations: I want |
170 | 182 | # either a=AES(readkey, offset) or better yet both of: |
171 | 183 | # a=AES(readkey, offset=0) |
… |
… |
class DecryptingConsumer: |
177 | 189 | self._decryptor = AES(readkey, iv=iv) |
178 | 190 | self._decryptor.process("\x00"*offset_small) |
179 | 191 | |
| 192 | def set_download_status_read_event(self, read_ev): |
| 193 | self._read_event = read_ev |
| 194 | |
180 | 195 | def registerProducer(self, producer, streaming): |
181 | 196 | # this passes through, so the real consumer can flow-control the real |
182 | 197 | # producer. Therefore we don't need to provide any IPushProducer |
… |
… |
class DecryptingConsumer: |
188 | 203 | def write(self, ciphertext): |
189 | 204 | started = now() |
190 | 205 | plaintext = self._decryptor.process(ciphertext) |
191 | | elapsed = now() - started |
192 | | self._read_event.update(0, elapsed, 0) |
| 206 | if self._read_event: |
| 207 | elapsed = now() - started |
| 208 | self._read_event.update(0, elapsed, 0) |
193 | 209 | self._consumer.write(plaintext) |
194 | 210 | |
195 | 211 | class ImmutableFileNode: |
… |
… |
class ImmutableFileNode: |
200 | 216 | history): |
201 | 217 | assert isinstance(filecap, uri.CHKFileURI) |
202 | 218 | verifycap = filecap.get_verify_cap() |
203 | | ds = DownloadStatus(verifycap.storage_index, verifycap.size) |
204 | | if history: |
205 | | history.add_download(ds) |
206 | | self._download_status = ds |
207 | 219 | self._cnode = CiphertextFileNode(verifycap, storage_broker, |
208 | | secret_holder, terminator, history, ds) |
| 220 | secret_holder, terminator, history) |
209 | 221 | assert isinstance(filecap, uri.CHKFileURI) |
210 | 222 | self.u = filecap |
211 | 223 | self._readkey = filecap.key |
… |
… |
class ImmutableFileNode: |
226 | 238 | return True |
227 | 239 | |
228 | 240 | def read(self, consumer, offset=0, size=None): |
229 | | actual_size = size |
230 | | if actual_size == None: |
231 | | actual_size = self.u.size |
232 | | actual_size = actual_size - offset |
233 | | read_ev = self._download_status.add_read_event(offset,actual_size, |
234 | | now()) |
235 | | decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) |
236 | | d = self._cnode.read(decryptor, offset, size, read_ev) |
| 241 | decryptor = DecryptingConsumer(consumer, self._readkey, offset) |
| 242 | d = self._cnode.read(decryptor, offset, size) |
237 | 243 | d.addCallback(lambda dc: consumer) |
238 | 244 | return d |
239 | 245 | |