1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | from __future__ import annotations |
---|
5 | |
---|
6 | import random |
---|
7 | |
---|
8 | from zope.interface import implementer |
---|
9 | from twisted.internet import defer, reactor |
---|
10 | from foolscap.api import eventually |
---|
11 | |
---|
12 | from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \ |
---|
13 | NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \ |
---|
14 | IMutableFileVersion, IWriteable |
---|
15 | from allmydata.util import hashutil, log, consumer, deferredutil, mathutil |
---|
16 | from allmydata.util.assertutil import precondition |
---|
17 | from allmydata.util.cputhreadpool import defer_to_thread |
---|
18 | from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \ |
---|
19 | WriteableMDMFFileURI, ReadonlyMDMFFileURI |
---|
20 | from allmydata.monitor import Monitor |
---|
21 | from allmydata.mutable.publish import Publish, MutableData,\ |
---|
22 | TransformingUploadable |
---|
23 | from allmydata.mutable.common import ( |
---|
24 | MODE_READ, |
---|
25 | MODE_WRITE, |
---|
26 | MODE_CHECK, |
---|
27 | UnrecoverableFileError, |
---|
28 | UncoordinatedWriteError, |
---|
29 | derive_mutable_keys, |
---|
30 | ) |
---|
31 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
---|
32 | from allmydata.mutable.retrieve import Retrieve |
---|
33 | from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer |
---|
34 | from allmydata.mutable.repairer import Repairer |
---|
35 | |
---|
36 | |
---|
37 | class BackoffAgent(object): |
---|
38 | # these parameters are copied from foolscap.reconnector, which gets them |
---|
39 | # from twisted.internet.protocol.ReconnectingClientFactory |
---|
40 | initialDelay = 1.0 |
---|
41 | factor = 2.7182818284590451 # (math.e) |
---|
42 | jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole |
---|
43 | maxRetries = 4 |
---|
44 | |
---|
45 | def __init__(self): |
---|
46 | self._delay = self.initialDelay |
---|
47 | self._count = 0 |
---|
48 | def delay(self, node, f): |
---|
49 | self._count += 1 |
---|
50 | if self._count == 4: |
---|
51 | return f |
---|
52 | self._delay = self._delay * self.factor |
---|
53 | self._delay = random.normalvariate(self._delay, |
---|
54 | self._delay * self.jitter) |
---|
55 | d = defer.Deferred() |
---|
56 | reactor.callLater(self._delay, d.callback, None) |
---|
57 | return d |
---|
58 | |
---|
59 | # use nodemaker.create_mutable_file() to make one of these |
---|
60 | |
---|
61 | @implementer(IMutableFileNode, ICheckable) |
---|
62 | class MutableFileNode(object): |
---|
63 | |
---|
64 | def __init__(self, storage_broker, secret_holder, |
---|
65 | default_encoding_parameters, history): |
---|
66 | self._storage_broker = storage_broker |
---|
67 | self._secret_holder = secret_holder |
---|
68 | self._default_encoding_parameters = default_encoding_parameters |
---|
69 | self._history = history |
---|
70 | self._pubkey = None # filled in upon first read |
---|
71 | self._privkey = None # filled in if we're mutable |
---|
72 | # we keep track of the last encoding parameters that we use. These |
---|
73 | # are updated upon retrieve, and used by publish. If we publish |
---|
74 | # without ever reading (i.e. overwrite()), then we use these values. |
---|
75 | self._required_shares = default_encoding_parameters["k"] |
---|
76 | self._total_shares = default_encoding_parameters["n"] |
---|
77 | self._sharemap = {} # known shares, shnum-to-[nodeids] |
---|
78 | self._most_recent_size = None |
---|
79 | # filled in after __init__ if we're being created for the first time; |
---|
80 | # filled in by the servermap updater before publishing, otherwise. |
---|
81 | # set to this default value in case neither of those things happen, |
---|
82 | # or in case the servermap can't find any shares to tell us what |
---|
83 | # to publish as. |
---|
84 | self._protocol_version = None |
---|
85 | |
---|
86 | # all users of this MutableFileNode go through the serializer. This |
---|
87 | # takes advantage of the fact that Deferreds discard the callbacks |
---|
88 | # that they're done with, so we can keep using the same Deferred |
---|
89 | # forever without consuming more and more memory. |
---|
90 | self._serializer = defer.succeed(None) |
---|
91 | |
---|
92 | # Starting with MDMF, we can get these from caps if they're |
---|
93 | # there. Leave them alone for now; they'll be filled in by my |
---|
94 | # init_from_cap method if necessary. |
---|
95 | self._downloader_hints = {} |
---|
96 | |
---|
97 | def __repr__(self): |
---|
98 | if hasattr(self, '_uri'): |
---|
99 | return "<%s %x %s %r>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev()) |
---|
100 | else: |
---|
101 | return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None) |
---|
102 | |
---|
103 | def init_from_cap(self, filecap): |
---|
104 | # we have the URI, but we have not yet retrieved the public |
---|
105 | # verification key, nor things like 'k' or 'N'. If and when someone |
---|
106 | # wants to get our contents, we'll pull from shares and fill those |
---|
107 | # in. |
---|
108 | if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)): |
---|
109 | self._protocol_version = MDMF_VERSION |
---|
110 | elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)): |
---|
111 | self._protocol_version = SDMF_VERSION |
---|
112 | |
---|
113 | self._uri = filecap |
---|
114 | self._writekey = None |
---|
115 | |
---|
116 | if not filecap.is_readonly() and filecap.is_mutable(): |
---|
117 | self._writekey = self._uri.writekey |
---|
118 | self._readkey = self._uri.readkey |
---|
119 | self._storage_index = self._uri.storage_index |
---|
120 | self._fingerprint = self._uri.fingerprint |
---|
121 | # the following values are learned during Retrieval |
---|
122 | # self._pubkey |
---|
123 | # self._required_shares |
---|
124 | # self._total_shares |
---|
125 | # and these are needed for Publish. They are filled in by Retrieval |
---|
126 | # if possible, otherwise by the first peer that Publish talks to. |
---|
127 | self._privkey = None |
---|
128 | self._encprivkey = None |
---|
129 | |
---|
130 | return self |
---|
131 | |
---|
132 | @deferredutil.async_to_deferred |
---|
133 | async def create_with_keys(self, keypair, contents, |
---|
134 | version=SDMF_VERSION): |
---|
135 | """Call this to create a brand-new mutable file. It will create the |
---|
136 | shares, find homes for them, and upload the initial contents (created |
---|
137 | with the same rules as IClient.create_mutable_file() ). Returns a |
---|
138 | Deferred that fires (with the MutableFileNode instance you should |
---|
139 | use) when it completes. |
---|
140 | """ |
---|
141 | self._pubkey, self._privkey = keypair |
---|
142 | self._writekey, self._encprivkey, self._fingerprint = await defer_to_thread( |
---|
143 | derive_mutable_keys, keypair |
---|
144 | ) |
---|
145 | if version == MDMF_VERSION: |
---|
146 | self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint) |
---|
147 | self._protocol_version = version |
---|
148 | elif version == SDMF_VERSION: |
---|
149 | self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) |
---|
150 | self._protocol_version = version |
---|
151 | self._readkey = self._uri.readkey |
---|
152 | self._storage_index = self._uri.storage_index |
---|
153 | initial_contents = self._get_initial_contents(contents) |
---|
154 | return await self._upload(initial_contents, None) |
---|
155 | |
---|
156 | def _get_initial_contents(self, contents): |
---|
157 | if contents is None: |
---|
158 | return MutableData(b"") |
---|
159 | |
---|
160 | if isinstance(contents, bytes): |
---|
161 | return MutableData(contents) |
---|
162 | |
---|
163 | if IMutableUploadable.providedBy(contents): |
---|
164 | return contents |
---|
165 | |
---|
166 | assert callable(contents), "%s should be callable, not %s" % \ |
---|
167 | (contents, type(contents)) |
---|
168 | return contents(self) |
---|
169 | |
---|
170 | def _populate_pubkey(self, pubkey): |
---|
171 | self._pubkey = pubkey |
---|
172 | def _populate_required_shares(self, required_shares): |
---|
173 | self._required_shares = required_shares |
---|
174 | def _populate_total_shares(self, total_shares): |
---|
175 | self._total_shares = total_shares |
---|
176 | |
---|
177 | def _populate_privkey(self, privkey): |
---|
178 | self._privkey = privkey |
---|
179 | def _populate_encprivkey(self, encprivkey): |
---|
180 | self._encprivkey = encprivkey |
---|
181 | |
---|
182 | def get_write_enabler(self, server): |
---|
183 | seed = server.get_foolscap_write_enabler_seed() |
---|
184 | assert len(seed) == 20 |
---|
185 | return hashutil.ssk_write_enabler_hash(self._writekey, seed) |
---|
186 | def get_renewal_secret(self, server): |
---|
187 | crs = self._secret_holder.get_renewal_secret() |
---|
188 | frs = hashutil.file_renewal_secret_hash(crs, self._storage_index) |
---|
189 | lease_seed = server.get_lease_seed() |
---|
190 | assert len(lease_seed) == 20 |
---|
191 | return hashutil.bucket_renewal_secret_hash(frs, lease_seed) |
---|
192 | def get_cancel_secret(self, server): |
---|
193 | ccs = self._secret_holder.get_cancel_secret() |
---|
194 | fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index) |
---|
195 | lease_seed = server.get_lease_seed() |
---|
196 | assert len(lease_seed) == 20 |
---|
197 | return hashutil.bucket_cancel_secret_hash(fcs, lease_seed) |
---|
198 | |
---|
199 | def get_writekey(self): |
---|
200 | return self._writekey |
---|
201 | def get_readkey(self): |
---|
202 | return self._readkey |
---|
203 | def get_storage_index(self): |
---|
204 | return self._storage_index |
---|
205 | def get_fingerprint(self): |
---|
206 | return self._fingerprint |
---|
207 | def get_privkey(self): |
---|
208 | return self._privkey |
---|
209 | def get_encprivkey(self): |
---|
210 | return self._encprivkey |
---|
211 | def get_pubkey(self): |
---|
212 | return self._pubkey |
---|
213 | |
---|
214 | def get_required_shares(self): |
---|
215 | return self._required_shares |
---|
216 | def get_total_shares(self): |
---|
217 | return self._total_shares |
---|
218 | |
---|
219 | #################################### |
---|
220 | # IFilesystemNode |
---|
221 | |
---|
222 | def get_size(self): |
---|
223 | return self._most_recent_size |
---|
224 | |
---|
225 | def get_current_size(self): |
---|
226 | d = self.get_size_of_best_version() |
---|
227 | d.addCallback(self._stash_size) |
---|
228 | return d |
---|
229 | |
---|
230 | def _stash_size(self, size): |
---|
231 | self._most_recent_size = size |
---|
232 | return size |
---|
233 | |
---|
234 | def get_cap(self): |
---|
235 | return self._uri |
---|
236 | def get_readcap(self): |
---|
237 | return self._uri.get_readonly() |
---|
238 | def get_verify_cap(self): |
---|
239 | return self._uri.get_verify_cap() |
---|
240 | def get_repair_cap(self): |
---|
241 | if self._uri.is_readonly(): |
---|
242 | return None |
---|
243 | return self._uri |
---|
244 | |
---|
245 | def get_uri(self): |
---|
246 | return self._uri.to_string() |
---|
247 | |
---|
248 | def get_write_uri(self): |
---|
249 | if self.is_readonly(): |
---|
250 | return None |
---|
251 | return self._uri.to_string() |
---|
252 | |
---|
253 | def get_readonly_uri(self): |
---|
254 | return self._uri.get_readonly().to_string() |
---|
255 | |
---|
256 | def get_readonly(self): |
---|
257 | if self.is_readonly(): |
---|
258 | return self |
---|
259 | ro = MutableFileNode(self._storage_broker, self._secret_holder, |
---|
260 | self._default_encoding_parameters, self._history) |
---|
261 | ro.init_from_cap(self._uri.get_readonly()) |
---|
262 | return ro |
---|
263 | |
---|
264 | def is_mutable(self): |
---|
265 | return self._uri.is_mutable() |
---|
266 | |
---|
267 | def is_readonly(self): |
---|
268 | return self._uri.is_readonly() |
---|
269 | |
---|
270 | def is_unknown(self): |
---|
271 | return False |
---|
272 | |
---|
273 | def is_allowed_in_immutable_directory(self): |
---|
274 | return not self._uri.is_mutable() |
---|
275 | |
---|
276 | def raise_error(self): |
---|
277 | pass |
---|
278 | |
---|
279 | def __hash__(self): |
---|
280 | return hash((self.__class__, self._uri)) |
---|
281 | |
---|
282 | def __eq__(self, them): |
---|
283 | if type(self) != type(them): |
---|
284 | return False |
---|
285 | return self._uri == them._uri |
---|
286 | |
---|
287 | def __ne__(self, them): |
---|
288 | return not (self == them) |
---|
289 | |
---|
290 | ################################# |
---|
291 | # ICheckable |
---|
292 | |
---|
293 | def check(self, monitor, verify=False, add_lease=False): |
---|
294 | checker = MutableChecker(self, self._storage_broker, |
---|
295 | self._history, monitor) |
---|
296 | return checker.check(verify, add_lease) |
---|
297 | |
---|
298 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
---|
299 | checker = MutableCheckAndRepairer(self, self._storage_broker, |
---|
300 | self._history, monitor) |
---|
301 | return checker.check(verify, add_lease) |
---|
302 | |
---|
303 | ################################# |
---|
304 | # IRepairable |
---|
305 | |
---|
306 | def repair(self, check_results, force=False, monitor=None): |
---|
307 | assert ICheckResults(check_results) |
---|
308 | r = Repairer(self, check_results, self._storage_broker, |
---|
309 | self._history, monitor) |
---|
310 | d = r.start(force) |
---|
311 | return d |
---|
312 | |
---|
313 | |
---|
314 | ################################# |
---|
315 | # IFileNode |
---|
316 | |
---|
317 | def get_best_readable_version(self): |
---|
318 | """ |
---|
319 | I return a Deferred that fires with a MutableFileVersion |
---|
320 | representing the best readable version of the file that I |
---|
321 | represent |
---|
322 | """ |
---|
323 | return self.get_readable_version() |
---|
324 | |
---|
325 | |
---|
326 | def get_readable_version(self, servermap=None, version=None): |
---|
327 | """ |
---|
328 | I return a Deferred that fires with an MutableFileVersion for my |
---|
329 | version argument, if there is a recoverable file of that version |
---|
330 | on the grid. If there is no recoverable version, I fire with an |
---|
331 | UnrecoverableFileError. |
---|
332 | |
---|
333 | If a servermap is provided, I look in there for the requested |
---|
334 | version. If no servermap is provided, I create and update a new |
---|
335 | one. |
---|
336 | |
---|
337 | If no version is provided, then I return a MutableFileVersion |
---|
338 | representing the best recoverable version of the file. |
---|
339 | """ |
---|
340 | d = self._get_version_from_servermap(MODE_READ, servermap, version) |
---|
341 | def _build_version(servermap_and_their_version): |
---|
342 | (servermap, their_version) = servermap_and_their_version |
---|
343 | assert their_version in servermap.recoverable_versions() |
---|
344 | assert their_version in servermap.make_versionmap() |
---|
345 | |
---|
346 | mfv = MutableFileVersion(self, |
---|
347 | servermap, |
---|
348 | their_version, |
---|
349 | self._storage_index, |
---|
350 | self._storage_broker, |
---|
351 | self._readkey, |
---|
352 | history=self._history) |
---|
353 | assert mfv.is_readonly() |
---|
354 | mfv.set_downloader_hints(self._downloader_hints) |
---|
355 | # our caller can use this to download the contents of the |
---|
356 | # mutable file. |
---|
357 | return mfv |
---|
358 | return d.addCallback(_build_version) |
---|
359 | |
---|
360 | |
---|
361 | def _get_version_from_servermap(self, |
---|
362 | mode, |
---|
363 | servermap=None, |
---|
364 | version=None): |
---|
365 | """ |
---|
366 | I return a Deferred that fires with (servermap, version). |
---|
367 | |
---|
368 | This function performs validation and a servermap update. If it |
---|
369 | returns (servermap, version), the caller can assume that: |
---|
370 | - servermap was last updated in mode. |
---|
371 | - version is recoverable, and corresponds to the servermap. |
---|
372 | |
---|
373 | If version and servermap are provided to me, I will validate |
---|
374 | that version exists in the servermap, and that the servermap was |
---|
375 | updated correctly. |
---|
376 | |
---|
377 | If version is not provided, but servermap is, I will validate |
---|
378 | the servermap and return the best recoverable version that I can |
---|
379 | find in the servermap. |
---|
380 | |
---|
381 | If the version is provided but the servermap isn't, I will |
---|
382 | obtain a servermap that has been updated in the correct mode and |
---|
383 | validate that version is found and recoverable. |
---|
384 | |
---|
385 | If neither servermap nor version are provided, I will obtain a |
---|
386 | servermap updated in the correct mode, and return the best |
---|
387 | recoverable version that I can find in there. |
---|
388 | """ |
---|
389 | # XXX: wording ^^^^ |
---|
390 | if servermap and servermap.get_last_update()[0] == mode: |
---|
391 | d = defer.succeed(servermap) |
---|
392 | else: |
---|
393 | d = self._get_servermap(mode) |
---|
394 | |
---|
395 | def _get_version(servermap, v): |
---|
396 | if v and v not in servermap.recoverable_versions(): |
---|
397 | v = None |
---|
398 | elif not v: |
---|
399 | v = servermap.best_recoverable_version() |
---|
400 | if not v: |
---|
401 | raise UnrecoverableFileError("no recoverable versions") |
---|
402 | |
---|
403 | return (servermap, v) |
---|
404 | return d.addCallback(_get_version, version) |
---|
405 | |
---|
406 | |
---|
407 | def download_best_version(self): |
---|
408 | """ |
---|
409 | I return a Deferred that fires with the contents of the best |
---|
410 | version of this mutable file. |
---|
411 | """ |
---|
412 | return self._do_serialized(self._download_best_version) |
---|
413 | |
---|
414 | |
---|
415 | def _download_best_version(self): |
---|
416 | """ |
---|
417 | I am the serialized sibling of download_best_version. |
---|
418 | """ |
---|
419 | d = self.get_best_readable_version() |
---|
420 | d.addCallback(self._record_size) |
---|
421 | d.addCallback(lambda version: version.download_to_data()) |
---|
422 | |
---|
423 | # It is possible that the download will fail because there |
---|
424 | # aren't enough shares to be had. If so, we will try again after |
---|
425 | # updating the servermap in MODE_WRITE, which may find more |
---|
426 | # shares than updating in MODE_READ, as we just did. We can do |
---|
427 | # this by getting the best mutable version and downloading from |
---|
428 | # that -- the best mutable version will be a MutableFileVersion |
---|
429 | # with a servermap that was last updated in MODE_WRITE, as we |
---|
430 | # want. If this fails, then we give up. |
---|
431 | def _maybe_retry(failure): |
---|
432 | failure.trap(NotEnoughSharesError) |
---|
433 | |
---|
434 | d = self.get_best_mutable_version() |
---|
435 | d.addCallback(self._record_size) |
---|
436 | d.addCallback(lambda version: version.download_to_data()) |
---|
437 | return d |
---|
438 | |
---|
439 | d.addErrback(_maybe_retry) |
---|
440 | return d |
---|
441 | |
---|
442 | |
---|
443 | def _record_size(self, mfv): |
---|
444 | """ |
---|
445 | I record the size of a mutable file version. |
---|
446 | """ |
---|
447 | self._most_recent_size = mfv.get_size() |
---|
448 | return mfv |
---|
449 | |
---|
450 | |
---|
451 | def get_size_of_best_version(self): |
---|
452 | """ |
---|
453 | I return the size of the best version of this mutable file. |
---|
454 | |
---|
455 | This is equivalent to calling get_size() on the result of |
---|
456 | get_best_readable_version(). |
---|
457 | """ |
---|
458 | d = self.get_best_readable_version() |
---|
459 | return d.addCallback(lambda mfv: mfv.get_size()) |
---|
460 | |
---|
461 | |
---|
462 | ################################# |
---|
463 | # IMutableFileNode |
---|
464 | |
---|
465 | def get_best_mutable_version(self, servermap=None): |
---|
466 | """ |
---|
467 | I return a Deferred that fires with a MutableFileVersion |
---|
468 | representing the best readable version of the file that I |
---|
469 | represent. I am like get_best_readable_version, except that I |
---|
470 | will try to make a writeable version if I can. |
---|
471 | """ |
---|
472 | return self.get_mutable_version(servermap=servermap) |
---|
473 | |
---|
474 | |
---|
475 | def get_mutable_version(self, servermap=None, version=None): |
---|
476 | """ |
---|
477 | I return a version of this mutable file. I return a Deferred |
---|
478 | that fires with a MutableFileVersion |
---|
479 | |
---|
480 | If version is provided, the Deferred will fire with a |
---|
481 | MutableFileVersion initailized with that version. Otherwise, it |
---|
482 | will fire with the best version that I can recover. |
---|
483 | |
---|
484 | If servermap is provided, I will use that to find versions |
---|
485 | instead of performing my own servermap update. |
---|
486 | """ |
---|
487 | if self.is_readonly(): |
---|
488 | return self.get_readable_version(servermap=servermap, |
---|
489 | version=version) |
---|
490 | |
---|
491 | # get_mutable_version => write intent, so we require that the |
---|
492 | # servermap is updated in MODE_WRITE |
---|
493 | d = self._get_version_from_servermap(MODE_WRITE, servermap, version) |
---|
494 | def _build_version(servermap_and_smap_version): |
---|
495 | # these should have been set by the servermap update. |
---|
496 | (servermap, smap_version) = servermap_and_smap_version |
---|
497 | assert self._secret_holder |
---|
498 | assert self._writekey |
---|
499 | |
---|
500 | mfv = MutableFileVersion(self, |
---|
501 | servermap, |
---|
502 | smap_version, |
---|
503 | self._storage_index, |
---|
504 | self._storage_broker, |
---|
505 | self._readkey, |
---|
506 | self._writekey, |
---|
507 | self._secret_holder, |
---|
508 | history=self._history) |
---|
509 | assert not mfv.is_readonly() |
---|
510 | mfv.set_downloader_hints(self._downloader_hints) |
---|
511 | return mfv |
---|
512 | |
---|
513 | return d.addCallback(_build_version) |
---|
514 | |
---|
515 | |
---|
516 | # XXX: I'm uncomfortable with the difference between upload and |
---|
517 | # overwrite, which, FWICT, is basically that you don't have to |
---|
518 | # do a servermap update before you overwrite. We split them up |
---|
519 | # that way anyway, so I guess there's no real difficulty in |
---|
520 | # offering both ways to callers, but it also makes the |
---|
521 | # public-facing API cluttery, and makes it hard to discern the |
---|
522 | # right way of doing things. |
---|
523 | |
---|
524 | # In general, we leave it to callers to ensure that they aren't |
---|
525 | # going to cause UncoordinatedWriteErrors when working with |
---|
526 | # MutableFileVersions. We know that the next three operations |
---|
527 | # (upload, overwrite, and modify) will all operate on the same |
---|
528 | # version, so we say that only one of them can be going on at once, |
---|
529 | # and serialize them to ensure that that actually happens, since as |
---|
530 | # the caller in this situation it is our job to do that. |
---|
531 | def overwrite(self, new_contents): |
---|
532 | """ |
---|
533 | I overwrite the contents of the best recoverable version of this |
---|
534 | mutable file with new_contents. This is equivalent to calling |
---|
535 | overwrite on the result of get_best_mutable_version with |
---|
536 | new_contents as an argument. I return a Deferred that eventually |
---|
537 | fires with the results of my replacement process. |
---|
538 | """ |
---|
539 | # TODO: Update downloader hints. |
---|
540 | return self._do_serialized(self._overwrite, new_contents) |
---|
541 | |
---|
542 | |
---|
543 | def _overwrite(self, new_contents): |
---|
544 | """ |
---|
545 | I am the serialized sibling of overwrite. |
---|
546 | """ |
---|
547 | d = self.get_best_mutable_version() |
---|
548 | d.addCallback(lambda mfv: mfv.overwrite(new_contents)) |
---|
549 | d.addCallback(self._did_upload, new_contents.get_size()) |
---|
550 | return d |
---|
551 | |
---|
552 | |
---|
553 | def upload(self, new_contents, servermap): |
---|
554 | """ |
---|
555 | I overwrite the contents of the best recoverable version of this |
---|
556 | mutable file with new_contents, using servermap instead of |
---|
557 | creating/updating our own servermap. I return a Deferred that |
---|
558 | fires with the results of my upload. |
---|
559 | """ |
---|
560 | # TODO: Update downloader hints |
---|
561 | return self._do_serialized(self._upload, new_contents, servermap) |
---|
562 | |
---|
563 | |
---|
564 | def modify(self, modifier, backoffer=None): |
---|
565 | """ |
---|
566 | I modify the contents of the best recoverable version of this |
---|
567 | mutable file with the modifier. This is equivalent to calling |
---|
568 | modify on the result of get_best_mutable_version. I return a |
---|
569 | Deferred that eventually fires with an UploadResults instance |
---|
570 | describing this process. |
---|
571 | """ |
---|
572 | # TODO: Update downloader hints. |
---|
573 | return self._do_serialized(self._modify, modifier, backoffer) |
---|
574 | |
---|
575 | |
---|
576 | def _modify(self, modifier, backoffer): |
---|
577 | """ |
---|
578 | I am the serialized sibling of modify. |
---|
579 | """ |
---|
580 | d = self.get_best_mutable_version() |
---|
581 | d.addCallback(lambda mfv: mfv.modify(modifier, backoffer)) |
---|
582 | return d |
---|
583 | |
---|
584 | |
---|
585 | def download_version(self, servermap, version, fetch_privkey=False): |
---|
586 | """ |
---|
587 | Download the specified version of this mutable file. I return a |
---|
588 | Deferred that fires with the contents of the specified version |
---|
589 | as a bytestring, or errbacks if the file is not recoverable. |
---|
590 | """ |
---|
591 | d = self.get_readable_version(servermap, version) |
---|
592 | return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey)) |
---|
593 | |
---|
594 | |
---|
595 | def get_servermap(self, mode): |
---|
596 | """ |
---|
597 | I return a servermap that has been updated in mode. |
---|
598 | |
---|
599 | mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or |
---|
600 | MODE_ANYTHING. See servermap.py for more on what these mean. |
---|
601 | """ |
---|
602 | return self._do_serialized(self._get_servermap, mode) |
---|
603 | |
---|
604 | |
---|
605 | def _get_servermap(self, mode): |
---|
606 | """ |
---|
607 | I am a serialized twin to get_servermap. |
---|
608 | """ |
---|
609 | servermap = ServerMap() |
---|
610 | d = self._update_servermap(servermap, mode) |
---|
611 | # The servermap will tell us about the most recent size of the |
---|
612 | # file, so we may as well set that so that callers might get |
---|
613 | # more data about us. |
---|
614 | if not self._most_recent_size: |
---|
615 | d.addCallback(self._get_size_from_servermap) |
---|
616 | return d |
---|
617 | |
---|
618 | |
---|
619 | def _get_size_from_servermap(self, servermap): |
---|
620 | """ |
---|
621 | I extract the size of the best version of this file and record |
---|
622 | it in self._most_recent_size. I return the servermap that I was |
---|
623 | given. |
---|
624 | """ |
---|
625 | if servermap.recoverable_versions(): |
---|
626 | v = servermap.best_recoverable_version() |
---|
627 | size = v[4] # verinfo[4] == size |
---|
628 | self._most_recent_size = size |
---|
629 | return servermap |
---|
630 | |
---|
631 | |
---|
632 | def _update_servermap(self, servermap, mode): |
---|
633 | u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap, |
---|
634 | mode) |
---|
635 | if self._history: |
---|
636 | self._history.notify_mapupdate(u.get_status()) |
---|
637 | return u.update() |
---|
638 | |
---|
639 | |
---|
640 | #def set_version(self, version): |
---|
641 | # I can be set in two ways: |
---|
642 | # 1. When the node is created. |
---|
643 | # 2. (for an existing share) when the Servermap is updated |
---|
644 | # before I am read. |
---|
645 | # assert version in (MDMF_VERSION, SDMF_VERSION) |
---|
646 | # self._protocol_version = version |
---|
647 | |
---|
648 | |
---|
649 | def get_version(self): |
---|
650 | return self._protocol_version |
---|
651 | |
---|
652 | |
---|
653 | def _do_serialized(self, cb, *args, **kwargs): |
---|
654 | # note: to avoid deadlock, this callable is *not* allowed to invoke |
---|
655 | # other serialized methods within this (or any other) |
---|
656 | # MutableFileNode. The callable should be a bound method of this same |
---|
657 | # MFN instance. |
---|
658 | d = defer.Deferred() |
---|
659 | self._serializer.addCallback(lambda ignore: cb(*args, **kwargs)) |
---|
660 | # we need to put off d.callback until this Deferred is finished being |
---|
661 | # processed. Otherwise the caller's subsequent activities (like, |
---|
662 | # doing other things with this node) can cause reentrancy problems in |
---|
663 | # the Deferred code itself |
---|
664 | self._serializer.addBoth(lambda res: eventually(d.callback, res)) |
---|
665 | # add a log.err just in case something really weird happens, because |
---|
666 | # self._serializer stays around forever, therefore we won't see the |
---|
667 | # usual Unhandled Error in Deferred that would give us a hint. |
---|
668 | self._serializer.addErrback(log.err) |
---|
669 | return d |
---|
670 | |
---|
671 | |
---|
672 | def _upload(self, new_contents, servermap): |
---|
673 | """ |
---|
674 | A MutableFileNode still has to have some way of getting |
---|
675 | published initially, which is what I am here for. After that, |
---|
676 | all publishing, updating, modifying and so on happens through |
---|
677 | MutableFileVersions. |
---|
678 | """ |
---|
679 | assert self._pubkey, "update_servermap must be called before publish" |
---|
680 | |
---|
681 | # Define IPublishInvoker with a set_downloader_hints method? |
---|
682 | # Then have the publisher call that method when it's done publishing? |
---|
683 | p = Publish(self, self._storage_broker, servermap) |
---|
684 | if self._history: |
---|
685 | self._history.notify_publish(p.get_status(), |
---|
686 | new_contents.get_size()) |
---|
687 | d = p.publish(new_contents) |
---|
688 | d.addCallback(self._did_upload, new_contents.get_size()) |
---|
689 | return d |
---|
690 | |
---|
691 | |
---|
692 | def set_downloader_hints(self, hints): |
---|
693 | self._downloader_hints = hints |
---|
694 | |
---|
695 | def _did_upload(self, res, size): |
---|
696 | self._most_recent_size = size |
---|
697 | return res |
---|
698 | |
---|
699 | |
---|
700 | @implementer(IMutableFileVersion, IWriteable) |
---|
701 | class MutableFileVersion(object): |
---|
702 | """ |
---|
703 | I represent a specific version (most likely the best version) of a |
---|
704 | mutable file. |
---|
705 | |
---|
706 | Since I implement IReadable, instances which hold a |
---|
707 | reference to an instance of me are guaranteed the ability (absent |
---|
708 | connection difficulties or unrecoverable versions) to read the file |
---|
709 | that I represent. Depending on whether I was initialized with a |
---|
710 | write capability or not, I may also provide callers the ability to |
---|
711 | overwrite or modify the contents of the mutable file that I |
---|
712 | reference. |
---|
713 | """ |
---|
714 | |
---|
715 | def __init__(self, |
---|
716 | node, |
---|
717 | servermap, |
---|
718 | version, |
---|
719 | storage_index, |
---|
720 | storage_broker, |
---|
721 | readcap, |
---|
722 | writekey=None, |
---|
723 | write_secrets=None, |
---|
724 | history=None): |
---|
725 | |
---|
726 | self._node = node |
---|
727 | self._servermap = servermap |
---|
728 | self._version = version |
---|
729 | self._storage_index = storage_index |
---|
730 | self._write_secrets = write_secrets |
---|
731 | self._history = history |
---|
732 | self._storage_broker = storage_broker |
---|
733 | |
---|
734 | #assert isinstance(readcap, IURI) |
---|
735 | self._readcap = readcap |
---|
736 | |
---|
737 | self._writekey = writekey |
---|
738 | self._serializer = defer.succeed(None) |
---|
739 | |
---|
740 | |
---|
741 | def get_sequence_number(self): |
---|
742 | """ |
---|
743 | Get the sequence number of the mutable version that I represent. |
---|
744 | """ |
---|
745 | return self._version[0] # verinfo[0] == the sequence number |
---|
746 | |
---|
747 | |
---|
748 | # TODO: Terminology? |
---|
749 | def get_writekey(self): |
---|
750 | """ |
---|
751 | I return a writekey or None if I don't have a writekey. |
---|
752 | """ |
---|
753 | return self._writekey |
---|
754 | |
---|
755 | |
---|
756 | def set_downloader_hints(self, hints): |
---|
757 | """ |
---|
758 | I set the downloader hints. |
---|
759 | """ |
---|
760 | assert isinstance(hints, dict) |
---|
761 | |
---|
762 | self._downloader_hints = hints |
---|
763 | |
---|
764 | |
---|
765 | def get_downloader_hints(self): |
---|
766 | """ |
---|
767 | I return the downloader hints. |
---|
768 | """ |
---|
769 | return self._downloader_hints |
---|
770 | |
---|
771 | |
---|
772 | def overwrite(self, new_contents): |
---|
773 | """ |
---|
774 | I overwrite the contents of this mutable file version with the |
---|
775 | data in new_contents. |
---|
776 | """ |
---|
777 | assert not self.is_readonly() |
---|
778 | |
---|
779 | return self._do_serialized(self._overwrite, new_contents) |
---|
780 | |
---|
781 | |
---|
782 | def _overwrite(self, new_contents): |
---|
783 | assert IMutableUploadable.providedBy(new_contents) |
---|
784 | assert self._servermap.get_last_update()[0] == MODE_WRITE |
---|
785 | |
---|
786 | return self._upload(new_contents) |
---|
787 | |
---|
788 | |
---|
789 | def modify(self, modifier, backoffer=None): |
---|
790 | """I use a modifier callback to apply a change to the mutable file. |
---|
791 | I implement the following pseudocode:: |
---|
792 | |
---|
793 | obtain_mutable_filenode_lock() |
---|
794 | first_time = True |
---|
795 | while True: |
---|
796 | update_servermap(MODE_WRITE) |
---|
797 | old = retrieve_best_version() |
---|
798 | new = modifier(old, servermap, first_time) |
---|
799 | first_time = False |
---|
800 | if new == old: break |
---|
801 | try: |
---|
802 | publish(new) |
---|
803 | except UncoordinatedWriteError, e: |
---|
804 | backoffer(e) |
---|
805 | continue |
---|
806 | break |
---|
807 | release_mutable_filenode_lock() |
---|
808 | |
---|
809 | The idea is that your modifier function can apply a delta of some |
---|
810 | sort, and it will be re-run as necessary until it succeeds. The |
---|
811 | modifier must inspect the old version to see whether its delta has |
---|
812 | already been applied: if so it should return the contents unmodified. |
---|
813 | |
---|
814 | Note that the modifier is required to run synchronously, and must not |
---|
815 | invoke any methods on this MutableFileNode instance. |
---|
816 | |
---|
817 | The backoff-er is a callable that is responsible for inserting a |
---|
818 | random delay between subsequent attempts, to help competing updates |
---|
819 | from colliding forever. It is also allowed to give up after a while. |
---|
820 | The backoffer is given two arguments: this MutableFileNode, and the |
---|
821 | Failure object that contains the UncoordinatedWriteError. It should |
---|
822 | return a Deferred that will fire when the next attempt should be |
---|
823 | made, or return the Failure if the loop should give up. If |
---|
824 | backoffer=None, a default one is provided which will perform |
---|
825 | exponential backoff, and give up after 4 tries. Note that the |
---|
826 | backoffer should not invoke any methods on this MutableFileNode |
---|
827 | instance, and it needs to be highly conscious of deadlock issues. |
---|
828 | """ |
---|
829 | assert not self.is_readonly() |
---|
830 | |
---|
831 | return self._do_serialized(self._modify, modifier, backoffer) |
---|
832 | |
---|
833 | |
---|
834 | def _modify(self, modifier, backoffer): |
---|
835 | if backoffer is None: |
---|
836 | backoffer = BackoffAgent().delay |
---|
837 | return self._modify_and_retry(modifier, backoffer, True) |
---|
838 | |
---|
839 | |
---|
840 | def _modify_and_retry(self, modifier, backoffer, first_time): |
---|
841 | """ |
---|
842 | I try to apply modifier to the contents of this version of the |
---|
843 | mutable file. If I succeed, I return an UploadResults instance |
---|
844 | describing my success. If I fail, I try again after waiting for |
---|
845 | a little bit. |
---|
846 | """ |
---|
847 | log.msg("doing modify") |
---|
848 | if first_time: |
---|
849 | d = self._update_servermap() |
---|
850 | else: |
---|
851 | # We ran into trouble; do MODE_CHECK so we're a little more |
---|
852 | # careful on subsequent tries. |
---|
853 | d = self._update_servermap(mode=MODE_CHECK) |
---|
854 | |
---|
855 | d.addCallback(lambda ignored: |
---|
856 | self._modify_once(modifier, first_time)) |
---|
857 | def _retry(f): |
---|
858 | f.trap(UncoordinatedWriteError) |
---|
859 | # Uh oh, it broke. We're allowed to trust the servermap for our |
---|
860 | # first try, but after that we need to update it. It's |
---|
861 | # possible that we've failed due to a race with another |
---|
862 | # uploader, and if the race is to converge correctly, we |
---|
863 | # need to know about that upload. |
---|
864 | d2 = defer.maybeDeferred(backoffer, self, f) |
---|
865 | d2.addCallback(lambda ignored: |
---|
866 | self._modify_and_retry(modifier, |
---|
867 | backoffer, False)) |
---|
868 | return d2 |
---|
869 | d.addErrback(_retry) |
---|
870 | return d |
---|
871 | |
---|
872 | |
---|
873 | def _modify_once(self, modifier, first_time): |
---|
874 | """ |
---|
875 | I attempt to apply a modifier to the contents of the mutable |
---|
876 | file. |
---|
877 | """ |
---|
878 | assert self._servermap.get_last_update()[0] != MODE_READ |
---|
879 | |
---|
880 | # download_to_data is serialized, so we have to call this to |
---|
881 | # avoid deadlock. |
---|
882 | d = self._try_to_download_data() |
---|
883 | def _apply(old_contents): |
---|
884 | new_contents = modifier(old_contents, self._servermap, first_time) |
---|
885 | precondition((isinstance(new_contents, bytes) or |
---|
886 | new_contents is None), |
---|
887 | "Modifier function must return bytes " |
---|
888 | "or None") |
---|
889 | |
---|
890 | if new_contents is None or new_contents == old_contents: |
---|
891 | log.msg("no changes") |
---|
892 | # no changes need to be made |
---|
893 | if first_time: |
---|
894 | return |
---|
895 | # However, since Publish is not automatically doing a |
---|
896 | # recovery when it observes UCWE, we need to do a second |
---|
897 | # publish. See #551 for details. We'll basically loop until |
---|
898 | # we managed an uncontested publish. |
---|
899 | old_uploadable = MutableData(old_contents) |
---|
900 | new_contents = old_uploadable |
---|
901 | else: |
---|
902 | new_contents = MutableData(new_contents) |
---|
903 | |
---|
904 | return self._upload(new_contents) |
---|
905 | d.addCallback(_apply) |
---|
906 | return d |
---|
907 | |
---|
908 | |
---|
909 | def is_readonly(self): |
---|
910 | """ |
---|
911 | I return True if this MutableFileVersion provides no write |
---|
912 | access to the file that it encapsulates, and False if it |
---|
913 | provides the ability to modify the file. |
---|
914 | """ |
---|
915 | return self._writekey is None |
---|
916 | |
---|
917 | |
---|
918 | def is_mutable(self): |
---|
919 | """ |
---|
920 | I return True, since mutable files are always mutable by |
---|
921 | somebody. |
---|
922 | """ |
---|
923 | return True |
---|
924 | |
---|
925 | |
---|
926 | def get_storage_index(self): |
---|
927 | """ |
---|
928 | I return the storage index of the reference that I encapsulate. |
---|
929 | """ |
---|
930 | return self._storage_index |
---|
931 | |
---|
932 | |
---|
933 | def get_size(self): |
---|
934 | """ |
---|
935 | I return the length, in bytes, of this readable object. |
---|
936 | """ |
---|
937 | return self._servermap.size_of_version(self._version) |
---|
938 | |
---|
939 | |
---|
940 | def download_to_data(self, fetch_privkey=False): # type: ignore # fixme |
---|
941 | """ |
---|
942 | I return a Deferred that fires with the contents of this |
---|
943 | readable object as a byte string. |
---|
944 | |
---|
945 | """ |
---|
946 | c = consumer.MemoryConsumer() |
---|
947 | d = self.read(c, fetch_privkey=fetch_privkey) |
---|
948 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
---|
949 | return d |
---|
950 | |
---|
951 | |
---|
952 | def _try_to_download_data(self): |
---|
953 | """ |
---|
954 | I am an unserialized cousin of download_to_data; I am called |
---|
955 | from the children of modify() to download the data associated |
---|
956 | with this mutable version. |
---|
957 | """ |
---|
958 | c = consumer.MemoryConsumer() |
---|
959 | # modify will almost certainly write, so we need the privkey. |
---|
960 | d = self._read(c, fetch_privkey=True) |
---|
961 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
---|
962 | return d |
---|
963 | |
---|
964 | |
---|
965 | def read(self, consumer, offset=0, size=None, fetch_privkey=False): |
---|
966 | """ |
---|
967 | I read a portion (possibly all) of the mutable file that I |
---|
968 | reference into consumer. |
---|
969 | """ |
---|
970 | return self._do_serialized(self._read, consumer, offset, size, |
---|
971 | fetch_privkey) |
---|
972 | |
---|
973 | |
---|
974 | def _read(self, consumer, offset=0, size=None, fetch_privkey=False): |
---|
975 | """ |
---|
976 | I am the serialized companion of read. |
---|
977 | """ |
---|
978 | r = Retrieve(self._node, self._storage_broker, self._servermap, |
---|
979 | self._version, fetch_privkey) |
---|
980 | if self._history: |
---|
981 | self._history.notify_retrieve(r.get_status()) |
---|
982 | d = r.download(consumer, offset, size) |
---|
983 | return d |
---|
984 | |
---|
985 | |
---|
986 | def _do_serialized(self, cb, *args, **kwargs): |
---|
987 | # note: to avoid deadlock, this callable is *not* allowed to invoke |
---|
988 | # other serialized methods within this (or any other) |
---|
989 | # MutableFileNode. The callable should be a bound method of this same |
---|
990 | # MFN instance. |
---|
991 | d = defer.Deferred() |
---|
992 | self._serializer.addCallback(lambda ignore: cb(*args, **kwargs)) |
---|
993 | # we need to put off d.callback until this Deferred is finished being |
---|
994 | # processed. Otherwise the caller's subsequent activities (like, |
---|
995 | # doing other things with this node) can cause reentrancy problems in |
---|
996 | # the Deferred code itself |
---|
997 | self._serializer.addBoth(lambda res: eventually(d.callback, res)) |
---|
998 | # add a log.err just in case something really weird happens, because |
---|
999 | # self._serializer stays around forever, therefore we won't see the |
---|
1000 | # usual Unhandled Error in Deferred that would give us a hint. |
---|
1001 | self._serializer.addErrback(log.err) |
---|
1002 | return d |
---|
1003 | |
---|
1004 | |
---|
1005 | def _upload(self, new_contents): |
---|
1006 | #assert self._pubkey, "update_servermap must be called before publish" |
---|
1007 | p = Publish(self._node, self._storage_broker, self._servermap) |
---|
1008 | if self._history: |
---|
1009 | self._history.notify_publish(p.get_status(), |
---|
1010 | new_contents.get_size()) |
---|
1011 | d = p.publish(new_contents) |
---|
1012 | d.addCallback(self._did_upload, new_contents.get_size()) |
---|
1013 | return d |
---|
1014 | |
---|
1015 | |
---|
1016 | def _did_upload(self, res, size): |
---|
1017 | self._most_recent_size = size |
---|
1018 | return res |
---|
1019 | |
---|
1020 | def update(self, data, offset): |
---|
1021 | """ |
---|
1022 | Do an update of this mutable file version by inserting data at |
---|
1023 | offset within the file. If offset is the EOF, this is an append |
---|
1024 | operation. I return a Deferred that fires with the results of |
---|
1025 | the update operation when it has completed. |
---|
1026 | |
---|
1027 | In cases where update does not append any data, or where it does |
---|
1028 | not append so many blocks that the block count crosses a |
---|
1029 | power-of-two boundary, this operation will use roughly |
---|
1030 | O(data.get_size()) memory/bandwidth/CPU to perform the update. |
---|
1031 | Otherwise, it must download, re-encode, and upload the entire |
---|
1032 | file again, which will use O(filesize) resources. |
---|
1033 | """ |
---|
1034 | return self._do_serialized(self._update, data, offset) |
---|
1035 | |
---|
1036 | |
---|
1037 | def _update(self, data, offset): |
---|
1038 | """ |
---|
1039 | I update the mutable file version represented by this particular |
---|
1040 | IMutableVersion by inserting the data in data at the offset |
---|
1041 | offset. I return a Deferred that fires when this has been |
---|
1042 | completed. |
---|
1043 | """ |
---|
1044 | new_size = data.get_size() + offset |
---|
1045 | old_size = self.get_size() |
---|
1046 | segment_size = self._version[3] |
---|
1047 | num_old_segments = mathutil.div_ceil(old_size, |
---|
1048 | segment_size) |
---|
1049 | num_new_segments = mathutil.div_ceil(new_size, |
---|
1050 | segment_size) |
---|
1051 | log.msg("got %d old segments, %d new segments" % \ |
---|
1052 | (num_old_segments, num_new_segments)) |
---|
1053 | |
---|
1054 | # We do a whole file re-encode if the file is an SDMF file. |
---|
1055 | if self._version[2]: # version[2] == SDMF salt, which MDMF lacks |
---|
1056 | log.msg("doing re-encode instead of in-place update") |
---|
1057 | return self._do_modify_update(data, offset) |
---|
1058 | |
---|
1059 | # Otherwise, we can replace just the parts that are changing. |
---|
1060 | log.msg("updating in place") |
---|
1061 | d = self._do_update_update(data, offset) |
---|
1062 | d.addCallback(self._decode_and_decrypt_segments, data, offset) |
---|
1063 | d.addCallback(self._build_uploadable_and_finish, data, offset) |
---|
1064 | return d |
---|
1065 | |
---|
1066 | |
---|
1067 | def _do_modify_update(self, data, offset): |
---|
1068 | """ |
---|
1069 | I perform a file update by modifying the contents of the file |
---|
1070 | after downloading it, then reuploading it. I am less efficient |
---|
1071 | than _do_update_update, but am necessary for certain updates. |
---|
1072 | """ |
---|
1073 | def m(old, servermap, first_time): |
---|
1074 | start = offset |
---|
1075 | rest = offset + data.get_size() |
---|
1076 | new = old[:start] |
---|
1077 | new += b"".join(data.read(data.get_size())) |
---|
1078 | new += old[rest:] |
---|
1079 | return new |
---|
1080 | return self._modify(m, None) |
---|
1081 | |
---|
1082 | |
---|
1083 | def _do_update_update(self, data, offset): |
---|
1084 | """ |
---|
1085 | I start the Servermap update that gets us the data we need to |
---|
1086 | continue the update process. I return a Deferred that fires when |
---|
1087 | the servermap update is done. |
---|
1088 | """ |
---|
1089 | assert IMutableUploadable.providedBy(data) |
---|
1090 | assert self.is_mutable() |
---|
1091 | # offset == self.get_size() is valid and means that we are |
---|
1092 | # appending data to the file. |
---|
1093 | assert offset <= self.get_size() |
---|
1094 | |
---|
1095 | segsize = self._version[3] |
---|
1096 | # We'll need the segment that the data starts in, regardless of |
---|
1097 | # what we'll do later. |
---|
1098 | start_segment = offset // segsize |
---|
1099 | |
---|
1100 | # We only need the end segment if the data we append does not go |
---|
1101 | # beyond the current end-of-file. |
---|
1102 | end_segment = start_segment |
---|
1103 | if offset + data.get_size() < self.get_size(): |
---|
1104 | end_data = offset + data.get_size() |
---|
1105 | # The last byte we touch is the end_data'th byte, which is actually |
---|
1106 | # byte end_data - 1 because bytes are zero-indexed. |
---|
1107 | end_data -= 1 |
---|
1108 | end_segment = end_data // segsize |
---|
1109 | |
---|
1110 | self._start_segment = start_segment |
---|
1111 | self._end_segment = end_segment |
---|
1112 | |
---|
1113 | # Now ask for the servermap to be updated in MODE_WRITE with |
---|
1114 | # this update range. |
---|
1115 | return self._update_servermap(update_range=(start_segment, |
---|
1116 | end_segment)) |
---|
1117 | |
---|
1118 | |
---|
1119 | def _decode_and_decrypt_segments(self, ignored, data, offset): |
---|
1120 | """ |
---|
1121 | After the servermap update, I take the encrypted and encoded |
---|
1122 | data that the servermap fetched while doing its update and |
---|
1123 | transform it into decoded-and-decrypted plaintext that can be |
---|
1124 | used by the new uploadable. I return a Deferred that fires with |
---|
1125 | the segments. |
---|
1126 | """ |
---|
1127 | r = Retrieve(self._node, self._storage_broker, self._servermap, |
---|
1128 | self._version) |
---|
1129 | # decode: takes in our blocks and salts from the servermap, |
---|
1130 | # returns a Deferred that fires with the corresponding plaintext |
---|
1131 | # segments. Does not download -- simply takes advantage of |
---|
1132 | # existing infrastructure within the Retrieve class to avoid |
---|
1133 | # duplicating code. |
---|
1134 | sm = self._servermap |
---|
1135 | # XXX: If the methods in the servermap don't work as |
---|
1136 | # abstractions, you should rewrite them instead of going around |
---|
1137 | # them. |
---|
1138 | update_data = sm.update_data |
---|
1139 | start_segments = {} # shnum -> start segment |
---|
1140 | end_segments = {} # shnum -> end segment |
---|
1141 | blockhashes = {} # shnum -> blockhash tree |
---|
1142 | for (shnum, original_data) in list(update_data.items()): |
---|
1143 | data = [d[1] for d in original_data if d[0] == self._version] |
---|
1144 | # data is [(blockhashes,start,end)..] |
---|
1145 | |
---|
1146 | # Every data entry in our list should now be share shnum for |
---|
1147 | # a particular version of the mutable file, so all of the |
---|
1148 | # entries should be identical. |
---|
1149 | datum = data[0] |
---|
1150 | assert [x for x in data if x != datum] == [] |
---|
1151 | |
---|
1152 | # datum is (blockhashes,start,end) |
---|
1153 | blockhashes[shnum] = datum[0] |
---|
1154 | start_segments[shnum] = datum[1] # (block,salt) bytestrings |
---|
1155 | end_segments[shnum] = datum[2] |
---|
1156 | |
---|
1157 | d1 = r.decode(start_segments, self._start_segment) |
---|
1158 | d2 = r.decode(end_segments, self._end_segment) |
---|
1159 | d3 = defer.succeed(blockhashes) |
---|
1160 | return deferredutil.gatherResults([d1, d2, d3]) |
---|
1161 | |
---|
1162 | |
---|
1163 | def _build_uploadable_and_finish(self, segments_and_bht, data, offset): |
---|
1164 | """ |
---|
1165 | After the process has the plaintext segments, I build the |
---|
1166 | TransformingUploadable that the publisher will eventually |
---|
1167 | re-upload to the grid. I then invoke the publisher with that |
---|
1168 | uploadable, and return a Deferred when the publish operation has |
---|
1169 | completed without issue. |
---|
1170 | """ |
---|
1171 | u = TransformingUploadable(data, offset, |
---|
1172 | self._version[3], |
---|
1173 | segments_and_bht[0], |
---|
1174 | segments_and_bht[1]) |
---|
1175 | p = Publish(self._node, self._storage_broker, self._servermap) |
---|
1176 | return p.update(u, offset, segments_and_bht[2], self._version) |
---|
1177 | |
---|
1178 | |
---|
1179 | def _update_servermap(self, mode=MODE_WRITE, update_range=None): |
---|
1180 | """ |
---|
1181 | I update the servermap. I return a Deferred that fires when the |
---|
1182 | servermap update is done. |
---|
1183 | """ |
---|
1184 | if update_range: |
---|
1185 | u = ServermapUpdater(self._node, self._storage_broker, Monitor(), |
---|
1186 | self._servermap, |
---|
1187 | mode=mode, |
---|
1188 | update_range=update_range) |
---|
1189 | else: |
---|
1190 | u = ServermapUpdater(self._node, self._storage_broker, Monitor(), |
---|
1191 | self._servermap, |
---|
1192 | mode=mode) |
---|
1193 | return u.update() |
---|
1194 | |
---|
1195 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3562 |
---|
1196 | def get_servermap(self): |
---|
1197 | raise NotImplementedError |
---|